incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [32/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/message/Response.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/message/Response.java b/s4-core/src/main/java/io/s4/message/Response.java
deleted file mode 100644
index 045a0fd..0000000
--- a/s4-core/src/main/java/io/s4/message/Response.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.message;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class Response {
-
-    private Map<String, Object> result;
-
-    private Map<String, String> exception;
-
-    private Request request;
-
-    public Response(Map<String, Object> result, Request request) {
-        this.result = result;
-        this.request = request;
-    }
-
-    public Response(Map<String, Object> result, Map<String, String> exception,
-            Request request) {
-        this.result = result;
-        this.exception = exception;
-        this.request = request;
-    }
-
-    public Response() {
-        result = null;
-        exception = null;
-        request = null;
-    }
-
-    /**
-     * Result of a request.
-     * 
-     * @return map from query strings o corresponding values.
-     */
-    public Map<String, Object> getResult() {
-        return result;
-    }
-
-    public Map<String, String> getException() {
-        return exception;
-    }
-
-    public Request getRequest() {
-        return request;
-    }
-
-    public Request.RInfo getRInfo() {
-        return (request != null ? request.getRInfo() : null);
-    }
-
-    public String toString() {
-        return "[" + result + "] (" + request + ")";
-    }
-
-    public List<CompoundKeyInfo> partition(int partCount) {
-        // partition id is available from the request info object
-
-        int p = this.getRInfo().getPartition();
-        List<CompoundKeyInfo> partitionInfoList = null;
-
-        if (p >= 0 && p < partCount) {
-            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-            partitionInfo.setPartitionId(p);
-
-            partitionInfoList = new ArrayList<CompoundKeyInfo>();
-            partitionInfoList.add(partitionInfo);
-        }
-
-        return partitionInfoList;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/message/SinglePERequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/message/SinglePERequest.java b/s4-core/src/main/java/io/s4/message/SinglePERequest.java
deleted file mode 100644
index d3f7fed..0000000
--- a/s4-core/src/main/java/io/s4/message/SinglePERequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.message;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.Hasher;
-import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.processor.AbstractPE;
-import io.s4.util.MethodInvoker;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.springframework.util.StringUtils;
-
-/**
- * A request for a value from a particular PE.
- */
-public class SinglePERequest extends Request {
-
-    private final List<String> target;
-
-    private final List<String> query;
-
-    public SinglePERequest(List<String> target, List<String> query, RInfo info) {
-        this.target = target;
-        this.query = query;
-        this.rinfo = info;
-    }
-
-    public SinglePERequest(List<String> target, List<String> query) {
-        this.target = target;
-        this.query = query;
-        this.rinfo = null;
-    }
-
-    public SinglePERequest() {
-        this.target = Collections.<String> emptyList();
-        this.query = Collections.<String> emptyList();
-        this.rinfo = null;
-    }
-
-    public String toString() {
-        return "target:" + target + " query:" + query + " info:" + rinfo;
-    }
-
-    /**
-     * Fields used to target a particular PE.
-     * 
-     * @return list of targeting values. Order matters.
-     */
-    public List<String> getTarget() {
-        return target;
-    }
-
-    /**
-     * List of field names that have to be read form target PE.
-     * 
-     * @return list of field name strings.
-     */
-    public List<String> getQuery() {
-        return query;
-    }
-
-    /**
-     * Evaluate Request on a particular PE.
-     * 
-     * @param pe
-     * @return Response object.
-     */
-    public Response evaluate(AbstractPE pe) {
-
-        HashMap<String, Object> results = new HashMap<String, Object>();
-        HashMap<String, String> exceptions = new HashMap<String, String>();
-
-        for (String q : query) {
-            // requests for getters should be of the form $field. Responds with
-            // pe.getField()
-            if (q.startsWith("$")) {
-                try {
-                    Object res = MethodInvoker.invokeGetter(pe, q.substring(1));
-                    results.put(q, res);
-
-                } catch (Exception e) {
-                    exceptions.put(q, e.toString());
-                }
-            }
-        }
-
-        return new Response(results, exceptions, this);
-    }
-
-    public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
-        List<String> valueList = this.getTarget();
-        if (valueList == null)
-            return null;
-
-        // First, build the key
-        KeyInfo keyInfo = new KeyInfo();
-        // special kay name to denote request
-        keyInfo.addElementToPath("#req");
-
-        // for value, concatenate list of values from Request's target field.
-        String stringValue = StringUtils.collectionToDelimitedString(valueList,
-                                                                     delim);
-        keyInfo.setValue(stringValue);
-
-        // partition id is derived form string value, as usual
-        int partitionId = (int) (h.hash(stringValue) % partCount);
-
-        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-        partitionInfo.addKeyInfo(keyInfo);
-        partitionInfo.setCompoundValue(stringValue);
-        partitionInfo.setPartitionId(partitionId);
-
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-        partitionInfoList.add(partitionInfo);
-
-        return partitionInfoList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/persist/ConMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/persist/ConMapPersister.java b/s4-core/src/main/java/io/s4/persist/ConMapPersister.java
deleted file mode 100644
index 4aa85ed..0000000
--- a/s4-core/src/main/java/io/s4/persist/ConMapPersister.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.persist;
-
-import io.s4.util.clock.Clock;
-
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-
-public class ConMapPersister implements Persister {
-    private AtomicInteger persistCount = new AtomicInteger(0);
-    private boolean selfClean = false;
-    private int cleanWaitTime = 40; // 20 seconds by default
-    private String loggerName = "s4";
-    ConcurrentHashMap<String, CacheEntry> cache;
-    Clock s4Clock;
-
-    private int startCapacity = 5000;
-
-    public void setStartCapacity(int startCapacity) {
-        this.startCapacity = startCapacity;
-    }
-
-    public int getStartCapacity() {
-        return startCapacity;
-    }
-
-    public void setSelfClean(boolean selfClean) {
-        this.selfClean = selfClean;
-    }
-
-    public void setCleanWaitTime(int cleanWaitTime) {
-        this.cleanWaitTime = cleanWaitTime;
-    }
-
-    public void setLoggerName(String loggerName) {
-        this.loggerName = loggerName;
-    }
-
-    public ConMapPersister(Clock s4Clock) {
-        this.s4Clock = s4Clock;
-    }
-    
-    public void setS4Clock(Clock s4Clock) {
-        this.s4Clock = s4Clock;
-    }
-    
-    public ConMapPersister() {
-    }
-
-    public void init() {
-        cache = new ConcurrentHashMap<String, CacheEntry>(this.getStartCapacity());
-
-        if (selfClean) {
-            Runnable r = new Runnable() {
-                public void run() {
-                    while (!Thread.interrupted()) {
-                        int cleanCount = ConMapPersister.this.cleanOutGarbage();
-                        Logger.getLogger(loggerName).info("Cleaned out "
-                                + cleanCount + " entries; Persister has "
-                                + cache.size() + " entries");
-                        try {
-                            Thread.sleep(cleanWaitTime * 1000);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-            };
-            Thread t = new Thread(r);
-            t.start();
-            t.setPriority(Thread.MIN_PRIORITY);
-        }
-    }
-
-    public int getQueueSize() {
-        return 0;
-    }
-
-    public int getPersistCount() {
-        return persistCount.get();
-    }
-
-    public int getCacheEntryCount() {
-        return cache.size();
-    }
-
-    public void setAsynch(String key, Object value, int period) {
-        // there really is no asynch for the local cache
-        set(key, value, period);
-    }
-
-    public void set(String key, Object value, int period) {
-        if (value == null) {
-            cache.remove(key);
-            return;
-        }
-        persistCount.getAndIncrement();
-        CacheEntry ce = new CacheEntry();
-        ce.value = value;
-        ce.period = period;
-        ce.addTime = s4Clock.getCurrentTime();
-        cache.put(key, ce);
-    }
-
-    public Object get(String key) {
-        CacheEntry ce = cache.get(key);
-        if (ce == null) {
-            return null;
-        }
-
-        if (ce.isExpired()) {
-            return null;
-        }
-
-        return ce.value;
-    }
-
-    public Map<String, Object> getBulk(String[] keys) {
-        HashMap map = new HashMap<String, Object>();
-        for (String key : keys) {
-            Object value = get(key);
-            if (value != null) {
-                map.put(key, value);
-            }
-        }
-        return map;
-    }
-
-    public Object getObject(String key) {
-        return get(key);
-    }
-
-    public Map<String, Object> getBulkObjects(String[] keys) {
-        return getBulk(keys);
-    }
-
-    public void remove(String key) {
-        cache.remove(key);
-    }
-
-    public int cleanOutGarbage() {
-        int count = 0;
-        for (Enumeration en = cache.keys(); en.hasMoreElements();) {
-            String key = (String) en.nextElement();
-            CacheEntry ce = cache.get(key);
-            if (ce != null && ce.isExpired()) {
-                count++;
-                cache.remove(key);
-            }
-        }
-        return count;
-    }
-
-    public Set<String> keySet() {
-        return cache.keySet();
-    }
-
-    public class CacheEntry {
-        Object value;
-        long addTime;
-        int period;
-
-        public boolean isExpired() {
-            if (period > 0) {
-                if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
-                    return true;
-                }
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/persist/DumpingPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/persist/DumpingPersister.java b/s4-core/src/main/java/io/s4/persist/DumpingPersister.java
deleted file mode 100644
index 8737f08..0000000
--- a/s4-core/src/main/java/io/s4/persist/DumpingPersister.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.persist;
-
-import io.s4.processor.OutputFormatter;
-import io.s4.util.clock.Clock;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.log4j.Logger;
-
-public class DumpingPersister extends ConMapPersister implements Runnable {
-    
-    public DumpingPersister() {       
-    }
-    
-    public DumpingPersister(Clock s4Clock) {
-        super(s4Clock);
-        // TODO Auto-generated constructor stub
-    }
-
-    private String dumpFilePrefix;
-    private Map<String, OutputFormatter> regexFormatter;
-    private Pattern[] patterns;
-    private OutputFormatter[] formatters;
-    private long outputTimeBoundary;
-
-    public void setDumpFilePrefix(String dumpFilePrefix) {
-        this.dumpFilePrefix = dumpFilePrefix;
-    }
-
-    public void setRegexFormatter(Map<String, OutputFormatter> regexFormatter) {
-        this.regexFormatter = regexFormatter;
-    }
-
-    public void setOutputTimeBoundary(long outputTimeBoundary) {
-        this.outputTimeBoundary = outputTimeBoundary;
-    }
-
-    public void init() {
-        super.init();
-
-        Set<String> regexes = regexFormatter.keySet();
-        patterns = new Pattern[regexes.size()];
-        formatters = new OutputFormatter[regexes.size()];
-
-        int i = 0;
-        for (String regex : regexes) {
-            patterns[i] = Pattern.compile(regex);
-            formatters[i] = regexFormatter.get(regex);
-            i++;
-        }
-
-        Thread t = new Thread(this);
-        t.start();
-    }
-
-    public void run() {
-        long boundaryInMillis = outputTimeBoundary * 1000;
-        long currentTime = System.currentTimeMillis();
-        while (!Thread.interrupted()) {
-            long currentBoundary = (currentTime / boundaryInMillis)
-                    * boundaryInMillis;
-            long interval = ((currentBoundary + boundaryInMillis) - System.currentTimeMillis());
-            if (interval > 0) {
-                try {
-                    Thread.sleep(interval);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-
-            currentTime = System.currentTimeMillis();
-
-            try {
-                output();
-            } catch (Exception e) {
-                Logger.getLogger("s4").error("Exception dumping persister", e);
-            }
-        }
-    }
-
-    public void output() {
-        File file = new File(dumpFilePrefix + UUID.randomUUID().toString());
-        Logger.getLogger("s4").info("Dumping to " + file);
-        FileOutputStream fos = null;
-        OutputStreamWriter osw = null;
-        BufferedWriter dumpWriter = null;
-
-        try {
-            fos = new FileOutputStream(file);
-            osw = new OutputStreamWriter(fos);
-            dumpWriter = new BufferedWriter(osw);
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-        }
-
-        int savedPriority = Thread.currentThread().getPriority();
-        try {
-
-            Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
-            Set<String> keys = new HashSet<String>();
-            for (String key : this.keySet()) {
-                keys.add(key);
-            }
-
-            for (String key : keys) {
-                Object value = this.get(key);
-                if (value == null) {
-                    continue;
-                }
-                for (int patternId = 0; patternId < patterns.length; patternId++) {
-                    Matcher m = patterns[patternId].matcher(key);
-                    if (m.matches()) {
-                        Object formattedValue = formatters[patternId].format(value);
-                        dumpWriter.write(key + " = " + formattedValue + "\n");
-                    }
-                }
-
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            Thread.currentThread().setPriority(savedPriority);
-            try {
-                dumpWriter.close();
-            } catch (Exception e) {
-            }
-            try {
-                osw.close();
-            } catch (Exception e) {
-            }
-            try {
-                fos.close();
-            } catch (Exception e) {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/persist/HashMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/persist/HashMapPersister.java b/s4-core/src/main/java/io/s4/persist/HashMapPersister.java
deleted file mode 100644
index 483feb1..0000000
--- a/s4-core/src/main/java/io/s4/persist/HashMapPersister.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.persist;
-
-import io.s4.util.clock.Clock;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-public class HashMapPersister implements Persister {
-    private volatile int persistCount = 0;
-    private boolean selfClean = false;
-    private int cleanWaitTime = 40; // 40 seconds by default
-    private String loggerName = "s4";
-    Map<String, CacheEntry> cache;
-    Clock s4Clock;
-
-    private int startCapacity = 5000;
-
-    public void setStartCapacity(int startCapacity) {
-        this.startCapacity = startCapacity;
-    }
-
-    public int getStartCapacity() {
-        return startCapacity;
-    }
-
-    public void setSelfClean(boolean selfClean) {
-        this.selfClean = selfClean;
-    }
-
-    public void setCleanWaitTime(int cleanWaitTime) {
-        this.cleanWaitTime = cleanWaitTime;
-    }
-
-    public void setLoggerName(String loggerName) {
-        this.loggerName = loggerName;
-    }
-
-    public HashMapPersister(Clock s4Clock) {
-        this.s4Clock = s4Clock;
-    }
-    
-    public void setS4Clock(Clock s4Clock) {
-        this.s4Clock = s4Clock;
-    }
-
-    public void init() {
-        cache = Collections.synchronizedMap(new HashMap<String, CacheEntry>(this.getStartCapacity()));
-
-        if (selfClean) {
-            Runnable r = new Runnable() {
-                public void run() {
-                    while (!Thread.interrupted()) {
-                        int cleanCount = HashMapPersister.this.cleanOutGarbage();
-                        Logger.getLogger(loggerName).info("Cleaned out "
-                                + cleanCount + " entries; Persister has "
-                                + cache.size() + " entries");
-                        try {
-                            Thread.sleep(cleanWaitTime * 1000);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                }
-            };
-            Thread t = new Thread(r);
-            t.start();
-            t.setPriority(Thread.MIN_PRIORITY);
-        }
-    }
-
-    public int getQueueSize() {
-        return 0;
-    }
-
-    public int getPersistCount() {
-        return persistCount;
-    }
-
-    public int getCacheEntryCount() {
-        return cache.size();
-    }
-
-    public void setAsynch(String key, Object value, int period) {
-        // there really is no asynch for the local cache
-        set(key, value, period);
-    }
-
-    public void set(String key, Object value, int period) {
-        if (value == null) {
-            cache.remove(key);
-            return;
-        }
-        
-        synchronized (this) {
-            persistCount++;
-        }
-
-        CacheEntry ce = new CacheEntry();
-        ce.value = value;
-        ce.period = period;
-        ce.addTime = s4Clock.getCurrentTime();
-        cache.put(key, ce);
-    }
-
-    public Object get(String key) {
-        CacheEntry ce = cache.get(key);
-        if (ce == null) {
-            return null;
-        }
-
-        if (ce.isExpired()) {
-            return null;
-        }
-
-        return ce.value;
-    }
-
-    public Map<String, Object> getBulk(String[] keys) {
-        HashMap map = new HashMap<String, Object>();
-        for (String key : keys) {
-            Object value = get(key);
-            if (value != null) {
-                map.put(key, value);
-            }
-        }
-        return map;
-    }
-
-    public Object getObject(String key) {
-        return get(key);
-    }
-
-    public Map<String, Object> getBulkObjects(String[] keys) {
-        return getBulk(keys);
-    }
-
-    public void remove(String key) {
-        cache.remove(key);
-    }
-
-    public int cleanOutGarbage() {
-        int count = 0;
-        List<String> keyList;
-        synchronized (cache) {
-            keyList = new ArrayList<String>(cache.size());
-            for (String key : cache.keySet()) {
-                keyList.add(key);
-            }
-        }
-
-        for (String key : keyList) {
-            CacheEntry ce = cache.get(key);
-            if (ce != null && ce.isExpired()) {
-                count++;
-                cache.remove(key);
-            }
-        }
-        return count;
-    }
-
-    public Set<String> keySet() {
-        Set<String> keys = new HashSet<String>();
-        synchronized (cache) {
-            for (String key : cache.keySet()) {
-                keys.add(key);
-            }
-        }
-        return keys;
-    }
-
-    public class CacheEntry {
-        Object value;
-        long addTime;
-        int period;
-
-        public boolean isExpired() {
-            if (period > 0) {
-                if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
-                    return true;
-                }
-            }
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/persist/Persister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/persist/Persister.java b/s4-core/src/main/java/io/s4/persist/Persister.java
deleted file mode 100644
index 1fe534c..0000000
--- a/s4-core/src/main/java/io/s4/persist/Persister.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.persist;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Defines an interface to a collection of key/value pairs, each of which has a
- * time-to-live property.
- **/
-public interface Persister {
-    /**
-     * Returns the number of <code>setAsynch</code>-initiated operations that
-     * are pending or in-progress.
-     * 
-     * @return the number of pending set operations
-     **/
-    int getQueueSize();
-
-    /**
-     * Returns the number of <code>set</code> or <code>setAsynch</code> calls
-     * made to this object.
-     * 
-     * @return the number of <code>set</code> or <code>setAsynch</code> calls
-     *         made to this object.
-     **/
-    int getPersistCount();
-
-    /**
-     * Returns the number of key/value pairs in this Persister object.
-     * <p>
-     * If the underlying technology (e.g., Memcached) does not support this
-     * operation, then the implementation may return 0 or a
-     * <code>java.lang.UnsupportedOperationException</code>.
-     * <p>
-     * This number may include expired entries, depending on the implementation.
-     * Some implementations clean up expired entries only when the value is
-     * requested or at certain time intervals (or not at all).
-     * 
-     * @return the number of entries in this Persister
-     **/
-    int getCacheEntryCount();
-
-    /**
-     * Set a value for a key without waiting for the operation to complete.
-     * <p>
-     * This method is useful in the case where the underlying implementation
-     * uses a network but the caller should not be delayed by the network
-     * operation.
-     * <p>
-     * Because the set operation is not neccessarily complete when the method
-     * returns, a subsequent <code>get</code> call may return stale data.
-     * <p>
-     * <code>period</code> specifies the time-to-live for this key/value pair.
-     * For calls to <code>get</code> or <code>getObject</code> after the
-     * specified period, the Persister object will return null.
-     * 
-     * @param key
-     *            the key
-     * @param value
-     *            the value
-     * @param period
-     *            the maximum interval of time during which this key/value pair
-     *            is valid. Also know as time-to-live. -1 indicates an infinite
-     *            time-to-live.
-     **/
-    void setAsynch(String key, Object value, int period);
-
-    /**
-     * Set a value for a key.
-     * <p>
-     * <code>period</code> specifies the time-to-live for this key/value pair.
-     * For calls to <code>get</code> or <code>getObject</code> after the
-     * specified period, the Persister object will return null.
-     * 
-     * @param key
-     *            the key
-     * @param value
-     *            the value
-     * @param period
-     *            the maximum interval of time during which this key/value pair
-     *            is valid. Also know as time-to-live. -1 indicates an infinite
-     *            time-to-live.
-     **/
-    void set(String key, Object value, int period) throws InterruptedException;
-
-    /**
-     * Get the value associated with a specified key.
-     * 
-     * If the period (aka time-to-live) for this key/value pair is expired, the
-     * <code>get</code> will return null.
-     * 
-     * @param key
-     *            the key
-     **/
-    Object get(String key) throws InterruptedException;
-
-    /**
-     * Get the values associated with a list of keys.
-     * <p>
-     * <code>getBulk</code> returns a <code>Map</code> containing an entry for
-     * each key/value pair. The map contains an entry for a specified key only
-     * if that key exists in the Persister and the associated key/value pair is
-     * not yet expired.
-     * 
-     * @param keys
-     *            a list of keys
-     **/
-    Map<String, Object> getBulk(String[] keys) throws InterruptedException;
-
-    /**
-     * This is a method to help support some implementations whose underlying
-     * technology encodes the value associated with a key. In some cases, the
-     * value may have been stored by a client that uses a form of encoding not
-     * supported by the Persister's implementation. In that case, one might want
-     * the raw, yet-to-be-decoded value associated with the key.
-     * <p>
-     * <code>getObject</code> retrieves the raw, unencoded value associated with
-     * a key. In all other respects, it's the same as {@link Persister#get}.
-     * <p>
-     * It's likely you will never need to call this method.
-     * 
-     * If the period (aka time-to-live) for this key/value pair is expired, the
-     * <code>get</code> will return null.
-     **/
-    Object getObject(String key) throws InterruptedException;
-
-    /**
-     * As with {@link Persister#getObject}, this is a method to help support
-     * some implementations whose underlying technology encodes result.
-     * <p>
-     * It's likely you will never need to call this method.
-     * <p>
-     * In all other respects, it is the same as {@link Persister#getBulk}.
-     **/
-    Map<String, Object> getBulkObjects(String[] keys)
-            throws InterruptedException;
-
-    /**
-     * Removes the entry for the specified key from the Persister object.
-     * 
-     * @param key
-     *            the key of the entry to be removed
-     **/
-    void remove(String key) throws InterruptedException;
-
-    /**
-     * Forces the initiation of the process that cleans up expired entries.
-     * <p>
-     * Normally, you would not call this method. The implementation takes care
-     * of the timing of the cleanup process.
-     * <p>
-     * In some implementations, this method may be a no-op.
-     * 
-     **/
-    int cleanOutGarbage() throws InterruptedException;
-
-    /**
-     * Returns a <code>Set</code> of the keys contained in this Persister
-     * object.
-     * <p>
-     * The <code>Set</code> may contain keys for expired entries, depending on
-     * how often the underlying implementation cleans up expired entries.
-     * <p>
-     * The underlying technology may not support this operation. As a result,
-     * some implementations return an empty set.
-     * 
-     * @return a <code>Set</code> of the keys contained in this Persister
-     *         object.
-     **/
-    Set<String> keySet();
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/AbstractPE.java b/s4-core/src/main/java/io/s4/processor/AbstractPE.java
deleted file mode 100644
index ff6f17c..0000000
--- a/s4-core/src/main/java/io/s4/processor/AbstractPE.java
+++ /dev/null
@@ -1,778 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *            http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
-import io.s4.ft.InitiateCheckpointingEvent;
-import io.s4.ft.RecoveryEvent;
-import io.s4.ft.SafeKeeper;
-import io.s4.ft.SafeKeeperId;
-import io.s4.persist.Persister;
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.schema.SchemaContainer;
-import io.s4.util.clock.Clock;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
-/**
- * This is the base class for processor classes.
- * <p>
- * <code>AbstractProcessor</code> provides output frequency strategies that
- * allow you to configure the rate at which your processor produces output (see
- * {@link AbstractPE#setOutputFrequencyByEventCount} and
- * {@link AbstractPE#setOutputFrequencyByTimeBoundary}.
- */
-public abstract class AbstractPE implements Cloneable {
-    public static enum FrequencyType {
-        TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount");
-
-        private String name;
-
-        FrequencyType(String name) {
-            this.name = name;
-        }
-
-        public String getName() {
-            return this.name;
-        }
-    }
-
-    public static enum PeriodicInvokerType {
-        OUTPUT, CHECKPOINTING;
-
-        public String getName() {
-            if (OUTPUT == this) {
-                return "PeriodicOutputInvoker";
-            } else {
-                return "PeriodicCheckpointingInvoker";
-            }
-        }
-    }
-
-    transient private Clock clock;
-    // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
-    // thread when checkpointing activated
-    transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
-    transient private int outputFrequency = 1;
-    transient private FrequencyType outputFrequencyType = FrequencyType.EVENTCOUNT;
-    transient private int outputFrequencyOffset = 0;
-    transient private int eventCount = 0;
-    transient private int ttl = -1;
-    transient private Persister lookupTable;
-    transient private List<EventAdvice> eventAdviceList = new ArrayList<EventAdvice>();
-    transient private List<Object> keyValue;
-    transient private List<Object> keyRecord;
-    private String keyValueString;
-    transient private String streamName;
-    transient private boolean saveKeyRecord = false;
-    transient private int outputsBeforePause = -1;
-    transient private long pauseTimeInMillis;
-    transient private boolean logPauses = false;
-    private String id;
-    transient protected SchemaContainer schemaContainer = new SchemaContainer();
-    transient private PrototypeWrapper prototypeWrapper;
-
-    transient private boolean recoveryAttempted = false;
-    // true if state may have changed
-    transient private boolean checkpointable = false;
-    // use a flag for identifying checkpointing events
-    transient private boolean isCheckpointingEvent = false;
-
-    transient private SafeKeeper safeKeeper; // handles fault tolerance
-    transient private CountDownLatch safeKeeperSetSignal = new CountDownLatch(1);
-    transient private int checkpointingFrequency = 0;
-    transient private FrequencyType checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
-    transient private int checkpointingFrequencyOffset = 0;
-    transient private int checkpointableEventCount = 0;
-    transient private int checkpointsBeforePause = -1;
-    transient private long checkpointingPauseTimeInMillis;
-
-    transient private OverloadDispatcher overloadDispatcher;
-
-    public void setSaveKeyRecord(boolean saveKeyRecord) {
-        this.saveKeyRecord = saveKeyRecord;
-    }
-
-    public void setOutputsBeforePause(int outputsBeforePause) {
-        this.outputsBeforePause = outputsBeforePause;
-    }
-
-    public void setCheckpointsBeforePause(int checkpointsBeforePause) {
-        this.checkpointsBeforePause = checkpointsBeforePause;
-    }
-
-    public void setPauseTimeInMillis(long pauseTimeInMillis) {
-        this.pauseTimeInMillis = pauseTimeInMillis;
-    }
-
-    public void setCheckpointingPauseTimeInMillis(long checkpointingPauseTimeInMillis) {
-        this.checkpointingPauseTimeInMillis = checkpointingPauseTimeInMillis;
-    }
-
-    public void setLogPauses(boolean logPauses) {
-        this.logPauses = logPauses;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public void setClock(Clock clock) {
-        this.clock = clock;
-        if (this.clock != null) {
-            this.s4ClockSetSignal.countDown();
-        }
-    }
-
-    /**
-     * This method will be called after the object is cloned from the prototype
-     * PE. The concrete PE class should override this if it has any special
-     * set-up requirements.
-     */
-    public void initInstance() {
-        // default implementation does nothing.
-    }
-
-    public Clock getClock() {
-        return clock;
-    }
-
-    public void setPrototypeWrapper(PrototypeWrapper prototypeWrapper) {
-        this.prototypeWrapper = prototypeWrapper;
-    }
-
-    public AbstractPE() {
-        OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
-        Class<?> overloadDispatcherClass = oldg.generate();
-
-        try {
-            overloadDispatcher = (OverloadDispatcher) overloadDispatcherClass.newInstance();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * You should not override this method. Instead, you need to implement the
-     * <code>processEvent</code> method.
-     **/
-    public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object event) {
-        // if this is the first time through, get the key for this PE
-        if (keyValue == null || saveKeyRecord) {
-            setKeyValue(event, compoundKeyInfo);
-
-            if (compoundKeyInfo != null)
-                keyValueString = compoundKeyInfo.getCompoundValue();
-        }
-
-        this.streamName = streamName;
-
-        if (safeKeeper != null) {
-            // initialize checkpointing event flag
-            this.isCheckpointingEvent = false;
-            if (!recoveryAttempted) {
-                recover();
-                recoveryAttempted = true;
-            }
-        }
-
-        overloadDispatcher.dispatch(this, event);
-
-        if (saveKeyRecord) {
-            keyRecord.clear(); // the PE doesn't need it anymore
-        }
-
-        if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0 && !isCheckpointingEvent) {
-            eventCount++;
-            if (eventCount % outputFrequency == 0) {
-                try {
-                    output();
-                } catch (Exception e) {
-                    Logger.getLogger("s4").error("Exception calling output() method in execute()", e);
-                }
-            }
-        }
-
-        // do not take into account checkpointing/recovery trigger messages
-        if (!isCheckpointingEvent) {
-            checkpointable = true; // dirty flag
-            if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
-                checkpointableEventCount++;
-                if (checkpointableEventCount % checkpointingFrequency == 0) {
-                    // for count-based frequency, we directly checkpoint here
-                    checkpoint();
-                }
-            }
-
-        }
-    }
-
-    public long getCurrentTime() {
-        return clock.getCurrentTime();
-    }
-
-    /**
-     * This method returns the key value associated with this PE.
-     * <p>
-     * The key value is a list because the key may be a compound (composite)
-     * key, in which case the key will have one value for each simple key.
-     * 
-     * @return the key value as a List of Objects (each element contains the
-     *         value of a simple key).
-     **/
-    public List<Object> getKeyValue() {
-        return keyValue;
-    }
-
-    public List<Object> getKeyRecord() {
-        return keyRecord;
-    }
-
-    public String getKeyValueString() {
-        return keyValueString;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    private void setKeyValue(Object event, CompoundKeyInfo compoundKeyInfo) {
-        if (compoundKeyInfo == null) {
-            return;
-        }
-
-        keyValue = new ArrayList<Object>();
-
-        Schema schema = schemaContainer.getSchema(event.getClass());
-
-        // get the value for each keyInfo
-        for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
-            Object value = null;
-            Object record = event;
-            List<?> list = null;
-            Property property = null;
-            for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
-                if (keyPathElement instanceof KeyPathElementIndex) {
-                    record = list.get(((KeyPathElementIndex) keyPathElement).getIndex());
-                    schema = property.getComponentProperty().getSchema();
-                } else {
-                    String keyPathElementName = ((KeyPathElementName) keyPathElement).getKeyName();
-                    property = schema.getProperties().get(keyPathElementName);
-                    value = null;
-                    try {
-                        value = property.getGetterMethod().invoke(record);
-                    } catch (Exception e) {
-                        Logger.getLogger("s4").error(e);
-                        return;
-                    }
-
-                    if (value == null) {
-                        Logger.getLogger("s4").error("Value for " + keyPathElementName + " is null!");
-                        return;
-                    }
-
-                    if (property.getType().isPrimitive() || property.isNumber()
-                            || property.getType().equals(String.class)) {
-                        keyValue.add(value);
-                        if (saveKeyRecord) {
-                            if (keyRecord == null) {
-                                keyRecord = new ArrayList<Object>();
-                            }
-                            keyRecord.add(record);
-                        }
-                        continue;
-                    } else if (property.isList()) {
-                        try {
-                            list = (List) property.getGetterMethod().invoke(record);
-                        } catch (Exception e) {
-                            Logger.getLogger("s4").error(e);
-                            return;
-                        }
-                    } else {
-                        try {
-                            record = property.getGetterMethod().invoke(record);
-                        } catch (Exception e) {
-                            Logger.getLogger("s4").error(e);
-                            return;
-                        }
-                        schema = property.getSchema();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * This method sets the output strategy to "by event count" and specifies
-     * how many events trigger a call to the <code>output</code> method.
-     * <p>
-     * You would not normally call this method directly, but instead via the S4
-     * configuration file.
-     * <p>
-     * After this method is called, AbstractProcessor will call your
-     * <code>output</code> method (implemented in your subclass) every
-     * <emp>outputFrequency</emph> events.
-     * <p>
-     * If you call neither <code>setOutputFrequencyByEventCount</code> nor
-     * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
-     * "by event count" with an output frequency of 1. (That is,
-     * <code>output</code> is called after after each return from
-     * <code>processEvent</code>).
-     * 
-     * @param outputFrequency
-     *            the number of application events passed to
-     *            <code>processEvent</code> before output is called.
-     **/
-    public void setOutputFrequencyByEventCount(int outputFrequency) {
-        this.outputFrequency = outputFrequency;
-        this.outputFrequencyType = FrequencyType.EVENTCOUNT;
-        initFrequency(PeriodicInvokerType.OUTPUT);
-    }
-
-    /**
-     * Sets the frequency strategy to "by event count". Uses the same mechanism
-     * than {@link #setOutputFrequencyByEventCount(int)}
-     * 
-     * @param checkpointingFrequency
-     *            the number of application events passed to
-     *            <code>processEvent</code> before output is called (ignoring
-     *            checkpointing events).
-     */
-    public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
-        this.checkpointingFrequency = checkpointingFrequency;
-        this.checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
-        supplementAdviceForCheckpointingAndRecovery();
-    }
-
-    /**
-     * This method sets the output strategy to "output on time boundary" and
-     * specifies the time boundary on which the <code>output</code> should be
-     * called.
-     * <p>
-     * You would not normally call this method directly, but instead via the S4
-     * configuration file.
-     * <p>
-     * <code>outputFrequency</code> specifies the time boundary in seconds.
-     * Whenever the current time is a multiple of <code>outputFrequency</code>,
-     * <code>AbstractProcessor</code> will call your <code>output</code> method.
-     * For example, if you specify an <code>outputFrequency</code> of 3600,
-     * <code>AbstractProcessor</code> will call <code>output</code> on every
-     * hour boundary (e.g., 11:00:00, 12:00:00, 13:00:00, etc.).
-     * <p>
-     * When this output strategy is used, your <code>output</code> method may
-     * occasionally (or frequently) run concurrently with your
-     * <code>processEvent</code> method. Therefore, you should take steps to
-     * protect any data structures that both methods use.
-     * <p>
-     * If you call neither <code>setOutputFrequencyByEventCount</code> nor
-     * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
-     * "by event count" with an output frequency of 1. (That is,
-     * <code>output</code> is called after after each return from
-     * <code>processEvent</code>).
-     * 
-     * @param outputFrequency
-     *            the time boundary in seconds
-     **/
-    public void setOutputFrequencyByTimeBoundary(int outputFrequency) {
-        this.outputFrequency = outputFrequency;
-        this.outputFrequencyType = FrequencyType.TIMEBOUNDARY;
-        initFrequency(PeriodicInvokerType.OUTPUT);
-    }
-
-    /**
-     * Sets the frequency of checkpointing. It uses the same mechanism than
-     * {@link #setOutputFrequencyByTimeBoundary(int)}
-     * 
-     * @param checkpointingFrequency
-     *            the time boundary in seconds
-     */
-    public void setCheckpointingFrequencyByTimeBoundary(int checkpointingFrequency) {
-        this.checkpointingFrequency = checkpointingFrequency;
-        this.checkpointingFrequencyType = FrequencyType.TIMEBOUNDARY;
-        supplementAdviceForCheckpointingAndRecovery();
-        initFrequency(PeriodicInvokerType.CHECKPOINTING);
-    }
-
-    /**
-     * Set the offset from the time boundary at which
-     * <code>AbstractProcessor</code> should call <code>output</code>.
-     * <p>
-     * This value is honored only if the "output on time boundary" output
-     * strategy is used.
-     * <p>
-     * As an example, if you specify an <code>outputFrequency</code> of 3600 and
-     * an <code>outputFrequencyOffset</code> of 7,
-     * <code>AbstractProcessor</code> will call <code>output</code> on every
-     * hour boundary plus 7 seconds (e.g., 11:00:07, 12:00:07, 13:00:07, etc.).
-     **/
-    public void setOutputFrequencyOffset(int outputFrequencyOffset) {
-        this.outputFrequencyOffset = outputFrequencyOffset;
-    }
-
-    /**
-     * Set the offset from the time boundary at which calls to checkpoint should
-     * be performed. It uses the same mechanism than
-     * {@link AbstractPE#setOutputFrequencyOffset(int)}
-     * 
-     * @param checkpointingFrequencyOffset
-     *            checkpointing frequency offset in seconds
-     */
-    public void setCheckpointingFrequencyOffset(int checkpointingFrequencyOffset) {
-        this.checkpointingFrequencyOffset = checkpointingFrequencyOffset;
-        supplementAdviceForCheckpointingAndRecovery();
-    }
-
-    public void setKeys(String[] keys) {
-        for (String key : keys) {
-            StringTokenizer st = new StringTokenizer(key);
-            eventAdviceList.add(new EventAdvice(st.nextToken(), st.nextToken()));
-        }
-        supplementAdviceForCheckpointingAndRecovery();
-    }
-
-    private void initFrequency(PeriodicInvokerType type) {
-        Runnable r = null;
-        if (PeriodicInvokerType.OUTPUT.equals(type)) {
-            if (outputFrequency < 0) {
-                return;
-            }
-
-            if (outputFrequencyType == FrequencyType.TIMEBOUNDARY) {
-                // create a thread that calls output on time boundaries
-                // that are multiples of frequency
-                r = new PeriodicInvoker(type);
-
-            }
-        } else {
-            if (checkpointingFrequency < 0) {
-                return;
-            }
-            if (checkpointingFrequencyType == FrequencyType.TIMEBOUNDARY) {
-                r = new PeriodicInvoker(type);
-            }
-        }
-        if (r != null) {
-            Thread t = new Thread(r, type.getName());
-            t.start();
-        }
-    }
-
-    /**
-     * This method exists simply to make <code>clone()</code> public.
-     */
-    public Object clone() {
-        try {
-            Object clone = super.clone();
-            return clone;
-        } catch (CloneNotSupportedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void setTtl(int ttl) {
-        this.ttl = ttl;
-    }
-
-    /**
-     * 
-     */
-    public int getTtl() {
-        return ttl;
-    }
-
-    public List<EventAdvice> advise() {
-        return eventAdviceList;
-    }
-
-    /**
-     * 
-     */
-    public void setLookupTable(Persister lookupTable) {
-        this.lookupTable = lookupTable;
-    }
-
-    /**
-     * You implement this abstract method in your subclass. This is the part of
-     * your processor that outputs data (e.g., by writing the data to the
-     * cache). The <code>output</code> method may further process the data
-     * (e.g., aggregate it) before outputting it.
-     **/
-    abstract public void output();
-
-    protected void checkpoint() {
-
-        byte[] serializedState = serializeState();
-        // NOTE: assumes pe id is keyvalue from the PE...
-        saveState(getSafeKeeperId(), serializedState);
-        // remove dirty flag
-        checkpointable = false;
-    }
-
-    private void saveState(SafeKeeperId key, byte[] serializedState) {
-        safeKeeper.saveState(key, serializedState);
-    }
-
-    protected void recover() {
-        byte[] serializedState = null;
-        try {
-            serializedState = safeKeeper.fetchSerializedState(getSafeKeeperId());
-        } catch (RuntimeException e) {
-            Logger.getLogger("s4-ft").error("Cannot fetch serialized stated for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
-        }
-        if (serializedState == null) {
-            return;
-        }
-        try {
-            AbstractPE peInOldState = deserializeState(serializedState);
-            restoreState(peInOldState);
-        } catch (RuntimeException e) {
-            Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
-        }
-    }
-
-    public SafeKeeperId getSafeKeeperId() {
-        return new SafeKeeperId(getId(), getKeyValueString());
-    }
-
-    public void setSafeKeeper(SafeKeeper safeKeeper) {
-        this.safeKeeper = safeKeeper;
-        if (safeKeeper != null) {
-            this.safeKeeperSetSignal.countDown();
-        }
-    }
-
-    public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
-        isCheckpointingEvent = true;
-        if (isCheckpointable()) {
-            checkpoint();
-        }
-    }
-
-    protected boolean isCheckpointable() {
-        return checkpointable;
-    }
-
-    protected void setCheckpointable(boolean checkpointable) {
-        this.checkpointable = checkpointable;
-    }
-
-    public final void initiateCheckpoint() {
-        // enqueue checkpointing event
-        if (safeKeeper != null) {
-            safeKeeper.generateCheckpoint(this);
-        }
-    }
-
-    public byte[] serializeState() {
-        return safeKeeper.getSerializer().serialize(this);
-    }
-
-    public AbstractPE deserializeState(byte[] loadedState) {
-        return (AbstractPE) safeKeeper.getSerializer().deserialize(loadedState);
-    }
-
-    public void restoreState(AbstractPE oldState) {
-        restoreFieldsForClass(oldState.getClass(), oldState);
-    }
-
-    private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, AbstractPE oldState) {
-        if (!AbstractPE.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
-            return;
-        } else {
-            Field[] fields = oldState.getClass().getDeclaredFields();
-            for (Field field : fields) {
-                if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
-                    if (!Modifier.isPublic(field.getModifiers())) {
-                        field.setAccessible(true);
-                    }
-                    try {
-                        // TODO use reflectasm
-                        field.set(this, field.get(oldState));
-                    } catch (IllegalArgumentException e) {
-                        Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
-                        return;
-                    } catch (IllegalAccessException e) {
-                        Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
-                        return;
-                    }
-
-                }
-            }
-            restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
-        }
-    }
-
-    /**
-     * Subscribes this PE to the checkpointing stream
-     */
-    private void supplementAdviceForCheckpointingAndRecovery() {
-        // don't do anything until both conditions are true
-        Logger.getLogger("s4").info(
-                "Maybe adding for " + this.getId() + ": " + checkpointingFrequency + " and " + eventAdviceList.size());
-        if (checkpointingFrequency > 0 && eventAdviceList.size() > 0) {
-            eventAdviceList.add(new EventAdvice(this.getId() + "_checkpointing", "key"));
-        }
-    }
-
-    public void processEvent(RecoveryEvent recoveryEvent) {
-        isCheckpointingEvent = true;
-        recover();
-    }
-    
-    /**
-     * This method expires the current PE.
-     **/
-    protected void expire() {
-        this.prototypeWrapper.expire(this.keyValueString);
-    }
-
-    class PeriodicInvoker implements Runnable {
-
-        PeriodicInvokerType type;
-
-        public PeriodicInvoker(PeriodicInvokerType type) {
-            this.type = type;
-        }
-
-        public long getFrequencyInMillis() {
-            if (type.equals(PeriodicInvokerType.OUTPUT)) {
-                return outputFrequency * 1000;
-            } else {
-                return checkpointingFrequency * 1000;
-            }
-        }
-
-        public long getFrequencyOffset() {
-            if (type.equals(PeriodicInvokerType.OUTPUT)) {
-                return outputFrequencyOffset;
-            } else {
-                return checkpointingFrequencyOffset;
-            }
-        }
-
-        public void run() {
-            if (clock == null) {
-                try {
-                    s4ClockSetSignal.await();
-                } catch (InterruptedException e) {
-                }
-            }
-            if (PeriodicInvokerType.CHECKPOINTING.equals(type) && safeKeeper == null) {
-                try {
-                    safeKeeperSetSignal.await();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            int outputCount = 0;
-            int checkpointCount = 0;
-            long frequencyInMillis = getFrequencyInMillis();
-
-            long currentTime = getCurrentTime();
-            while (!Thread.interrupted()) {
-                long currentBoundary = (currentTime / frequencyInMillis) * frequencyInMillis;
-                long nextBoundary = currentBoundary + frequencyInMillis;
-                currentTime = clock.waitForTime(nextBoundary + (outputFrequencyOffset * 1000));
-                if (lookupTable != null) {
-                    Set peKeys = lookupTable.keySet();
-                    for (Iterator it = peKeys.iterator(); it.hasNext();) {
-                        String peKey = (String) it.next();
-                        AbstractPE pe = null;
-                        try {
-                            pe = (AbstractPE) lookupTable.get(peKey);
-                        } catch (InterruptedException ie) {
-                        }
-
-                        if (pe == null) {
-                            continue;
-                        }
-
-                        if (PeriodicInvokerType.OUTPUT.equals(type)) {
-                            try {
-                                pe.output();
-                                outputCount++;
-                            } catch (Exception e) {
-                                Logger.getLogger("s4").error("Exception calling output() method", e);
-                            }
-
-                            if (outputCount == outputsBeforePause) {
-                                if (logPauses) {
-                                    Logger.getLogger("s4").info(
-                                            "Pausing " + getId() + " at count " + outputCount + " for "
-                                                    + pauseTimeInMillis + " milliseconds");
-                                }
-                                outputCount = 0;
-                                try {
-                                    Thread.sleep(pauseTimeInMillis);
-                                } catch (InterruptedException ie) {
-                                    Thread.currentThread().interrupt();
-                                }
-                            }
-                        } else if (PeriodicInvokerType.CHECKPOINTING.equals(type)) {
-                            try {
-                                if (pe.isCheckpointable()) {
-                                    pe.initiateCheckpoint();
-                                    checkpointCount++;
-                                }
-                            } catch (Exception e) {
-                                e.printStackTrace();
-                                Logger.getLogger("s4").error("Exception calling checkpoint() method", e);
-                            }
-
-                            if (checkpointCount == checkpointsBeforePause) {
-                                if (logPauses) {
-                                    Logger.getLogger("s4").info(
-                                            "Pausing " + getId() + " at checkpoint count " + checkpointCount + " for "
-                                                    + checkpointingPauseTimeInMillis + " milliseconds");
-                                }
-                                checkpointCount = 0;
-                                try {
-                                    Thread.sleep(checkpointingPauseTimeInMillis);
-                                } catch (InterruptedException ie) {
-                                    Thread.currentThread().interrupt();
-                                }
-                            }
-                        } // end for each pe in lookup table
-                    } // end if lookup table is not null
-                }
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/AbstractWindowingPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/AbstractWindowingPE.java b/s4-core/src/main/java/io/s4/processor/AbstractWindowingPE.java
deleted file mode 100644
index 3687ec8..0000000
--- a/s4-core/src/main/java/io/s4/processor/AbstractWindowingPE.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.util.SlotUtils;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-public abstract class AbstractWindowingPE extends AbstractPE {
-    private String slotClassName;
-    private int slotSize = 3600; // default one hour
-    private int windowSize = slotSize * 24; // default, 24 hours
-    private Map<String, String> timestampFields;
-
-    private long lastTimestamp = -1;
-    private Map<Long, Slot> slots;
-    private SlotUtils slotUtils;
-    private Class slotClass;
-
-    public void setSlotClassName(String slotClassName) {
-        this.slotClassName = slotClassName;
-    }
-
-    public void setSlotSize(int slotSize) {
-        this.slotSize = slotSize;
-    }
-
-    public int getSlotSize() {
-        return slotSize;
-    }
-
-    public void setWindowSize(int windowSize) {
-        this.windowSize = windowSize;
-    }
-    
-    public int getWindowSize() {
-        return windowSize;
-    }
-
-    public void setTimestampFields(String[] timestampFieldsArray) {
-        timestampFields = new HashMap<String, String>();
-        for (String timeStampFieldInfo : timestampFieldsArray) {
-            StringTokenizer st = new StringTokenizer(timeStampFieldInfo);
-            timestampFields.put(st.nextToken(), st.nextToken());
-        }
-    }
-
-    private OverloadDispatcherSlot overloadDispatcher;
-
-    public AbstractWindowingPE() {
-    }
-
-    public void init() {
-        // this reference will be shared amongst all instances of the pe
-        slotUtils = new SlotUtils(slotSize);
-
-        try {
-            slotClass = Class.forName(slotClassName);
-        } catch (ClassNotFoundException cnfe) {
-            throw new RuntimeException(cnfe);
-        }
-
-        OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(slotClass,
-                                                                           true);
-        Class<?> overloadDispatcherClass = oldg.generate();
-
-        try {
-            overloadDispatcher = (OverloadDispatcherSlot) overloadDispatcherClass.newInstance();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void processEvent(Object event) {
-        long currentTime = getCurrentTime();
-        long maybeCurrentTime = -1;
-        if (timestampFields != null) {
-            Schema schema = schemaContainer.getSchema(event.getClass());
-            String fieldName = timestampFields.get(getStreamName());
-            if (fieldName != null) {
-                Property property = schema.getProperties().get(fieldName);
-                if (property != null
-                        && (property.getType().equals(Long.TYPE) || property.getType()
-                                                                            .equals(Long.class))) {
-                    try {
-                        maybeCurrentTime = (Long) property.getGetterMethod()
-                                                          .invoke(event);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }
-        }
-
-        if (maybeCurrentTime > -1) {
-            currentTime = maybeCurrentTime;
-            lastTimestamp = currentTime;
-        }
-
-        long slotTime = slotUtils.getSlotAtTime(currentTime / 1000); // convert
-                                                                     // to
-                                                                     // seconds
-
-        if (slots == null) {
-            slots = Collections.synchronizedMap(new HashMap<Long, Slot>());
-        }
-
-        Slot slot = slots.get(slotTime);
-        if (slot == null) {
-            try {
-                slot = (Slot) slotClass.newInstance();
-            } catch (IllegalAccessException iae) {
-                throw new RuntimeException(iae);
-            } catch (InstantiationException ie) {
-                throw new RuntimeException(ie);
-            }
-            slots.put(slotTime, slot);
-        }
-
-        overloadDispatcher.dispatch(slot, event, slotTime, this);
-    }
-
-    public Map<Long, Slot> getSlots() {
-        pruneSlots(getCurrentTime() / 1000);
-
-        return Collections.unmodifiableMap(slots);
-    }
-
-    private void pruneSlots(long time) {
-        HashSet<Long> keys = new HashSet<Long>();
-
-        synchronized (slots) {
-            for (Long key : slots.keySet()) {
-                keys.add(key);
-            }
-        }
-
-        for (Long key : keys) {
-            if (slotUtils.isOutsideWindow(key, windowSize, time)) {
-                slots.remove(key);
-            }
-        }
-    }
-
-    public boolean isCurrentSlot(long slotTime) {
-        long currentSlot = slotUtils.getSlotAtTime(getCurrentTime() / 1000);
-        if (currentSlot == slotTime) {
-            return true;
-        }
-        return false;
-    }
-    
-    public boolean isOutsideWindow(long time) {
-        Long slotIndexAtTime = slotUtils.getSlotAtTime(time/1000);
-        return slotUtils.isOutsideWindow(slotIndexAtTime, windowSize, getCurrentTime()/1000);
-    }
-
-    public Long getSlotAtOffset(int offset) {
-        return slotUtils.getSlot(offset, getCurrentTime() / 1000);
-    }
-    
-    public Slot getSlotAtTime(long time) {
-        pruneSlots(getCurrentTime()/1000);
-        Long slotIndex = slotUtils.getSlotAtTime(time/1000);
-        return slots.get(slotIndex);
-    }
-    
-    public Long getSlotTimeForTime(long time) {
-        return slotUtils.getSlotAtTime(time/1000);
-    }
-
-    public static interface Slot {
-        // public void processEvent(Object event, long slotTime,
-        // AbstractWindowingPE pe);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/AsynchronousEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/AsynchronousEventProcessor.java b/s4-core/src/main/java/io/s4/processor/AsynchronousEventProcessor.java
deleted file mode 100644
index e032e15..0000000
--- a/s4-core/src/main/java/io/s4/processor/AsynchronousEventProcessor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.collector.EventWrapper;
-
-public interface AsynchronousEventProcessor {
-
-    void queueWork(EventWrapper eventWrapper);
-
-    // This will always be called by a different thread than the one executing
-    // run()
-    int getQueueSize();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java b/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java
deleted file mode 100644
index 0e2774c..0000000
--- a/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.message.PrototypeRequest;
-import io.s4.message.Response;
-import io.s4.message.SinglePERequest;
-
-import java.util.List;
-
-/**
- * Processes control events.
- */
-public class ControlEventProcessor {
-
-    private EventDispatcher dispatcher;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public void process(EventWrapper e, PrototypeWrapper p) {
-        String en = e.getStreamName(); // e.g. "#joinPe01"
-        String pn = p.getId(); // e.g. "JoinPE01"
-
-        // stream name has to match PE's ID (modulo case).
-        // e.g. "#joinPe01" will match "JoinPE01"
-        if (!en.regionMatches(true, 1, pn, 0, pn.length()))
-            return;
-
-        execute(e, p);
-    }
-
-    protected void execute(EventWrapper e, PrototypeWrapper p) {
-        List<CompoundKeyInfo> keyInfoList = e.getCompoundKeys();
-        Object event = e.getEvent();
-        
-        if (event instanceof SinglePERequest) {
-            // Handle Requests to individual PEs
-            if (keyInfoList.isEmpty())
-                return;
-
-            CompoundKeyInfo keyInfo = keyInfoList.get(0);
-
-            String keyVal = keyInfo.getCompoundValue();
-
-            AbstractPE pe = p.lookupPE(keyVal);
-
-            Response response = ((SinglePERequest) event).evaluate(pe);
-            String stream = response.getRInfo().getStream();
-
-            dispatcher.dispatchEvent(stream, response);
-
-        } else if (event instanceof PrototypeRequest) {
-            // Or handle aggregate requests to Prototypes.
-            Response response = ((PrototypeRequest) event).evaluate(p);
-            String stream = response.getRInfo().getStream();
-
-            dispatcher.dispatchEvent(stream, response);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/EventAdvice.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/EventAdvice.java b/s4-core/src/main/java/io/s4/processor/EventAdvice.java
deleted file mode 100644
index 18f6110..0000000
--- a/s4-core/src/main/java/io/s4/processor/EventAdvice.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-public class EventAdvice {
-    String eventName;
-    String key;
-
-    public EventAdvice(String eventName, String key) {
-        this.eventName = eventName;
-        this.key = key;
-    }
-
-    public String toString() {
-        return eventName + ":{" + key + "}";
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public String getEventName() {
-        return eventName;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/JoinPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/JoinPE.java b/s4-core/src/main/java/io/s4/processor/JoinPE.java
deleted file mode 100644
index e0e18b7..0000000
--- a/s4-core/src/main/java/io/s4/processor/JoinPE.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.logger.Monitor;
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.schema.SchemaContainer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.log4j.Logger;
-
-public class JoinPE extends AbstractPE {
-    private static Logger logger = Logger.getLogger(JoinPE.class);
-    private Map<String, List<String>> eventFields = new HashMap<String, List<String>>();
-    private Map<String, Object> eventsToJoin;
-    private EventDispatcher dispatcher;
-    private Monitor monitor;
-    private String outputStreamName;
-    private String outputClassName;
-    private Class<?> outputClass;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-    }
-
-    public void setOutputStreamName(String outputStreamName) {
-        this.outputStreamName = outputStreamName;
-    }
-
-    public String getOutputClassName() {
-        return outputClassName;
-    }
-
-    public void setOutputClassName(String outputClassName) {
-        this.outputClassName = outputClassName;
-        try {
-            this.outputClass = Class.forName(this.outputClassName);
-        } catch (ClassNotFoundException cnfe) {
-            throw new RuntimeException(cnfe);
-        }
-    }
-
-    public void setIncludeFields(String[] includeFields) {
-        for (String includeField : includeFields) {
-            StringTokenizer st = new StringTokenizer(includeField);
-            if (st.countTokens() != 2) {
-                Logger.getLogger("s4").error("Bad include field specified: "
-                        + includeField);
-                continue;
-            }
-
-            String eventName = st.nextToken();
-            String fieldName = st.nextToken();
-
-            List<String> fieldNames = eventFields.get(eventName);
-            if (fieldNames == null) {
-                fieldNames = new ArrayList<String>();
-                eventFields.put(eventName, fieldNames);
-            }
-
-            if (fieldName.equals("*")) {
-                fieldNames.clear();
-                fieldNames.add("*");
-            } else {
-                fieldNames.add(fieldName);
-            }
-        }
-    }
-
-    @Override
-    public void output() {
-        // TODO Auto-generated method stub
-
-    }
-
-    private SchemaContainer schemaContainer = new SchemaContainer();
-
-    public void processEvent(Object event) {
-        if (eventsToJoin == null) {
-            eventsToJoin = new HashMap<String, Object>();
-        }
-        List<String> fieldNames = eventFields.get(getStreamName());
-        if (fieldNames == null) {
-            return;
-        }
-
-        // we only use the last event that comes through on the given stream
-        eventsToJoin.put(getStreamName(), event);
-
-        if (eventsToJoin.keySet().size() == eventFields.keySet().size()) {
-            Object newEvent = null;
-            try {
-                newEvent = outputClass.newInstance();
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new RuntimeException(e);
-            }
-
-            Schema newEventSchema = schemaContainer.getSchema(newEvent.getClass());
-
-            for (String streamName : eventsToJoin.keySet()) {
-                Object partialEvent = eventsToJoin.get(streamName);
-                Schema partialEventSchema = schemaContainer.getSchema(partialEvent.getClass());
-
-                List<String> includeFields = eventFields.get(streamName);
-                if (includeFields.size() == 1
-                        && includeFields.get(0).equals("*")) {
-                    for (Property partialEventProperty : partialEventSchema.getProperties()
-                                                                           .values()) {
-                        copyField(partialEventProperty.getName(),
-                                  partialEventSchema,
-                                  newEventSchema,
-                                  partialEvent,
-                                  newEvent);
-                    }
-                } else {
-                    for (String includeField : includeFields) {
-                        copyField(includeField,
-                                  partialEventSchema,
-                                  newEventSchema,
-                                  partialEvent,
-                                  newEvent);
-                    }
-                }
-            }
-
-            dispatcher.dispatchEvent(outputStreamName, newEvent);
-            if (logger.isDebugEnabled()) {
-                logger.debug("STEP 7 (JoinPE): " + newEvent.toString());
-            }
-        }
-    }
-
-    private void copyField(String propertyName, Schema sourceSchema,
-                           Schema targetSchema, Object source, Object target) {
-        Property sourceProperty = sourceSchema.getProperties()
-                                              .get(propertyName);
-        Property targetProperty = targetSchema.getProperties()
-                                              .get(propertyName);
-
-        if (sourceProperty == null || targetProperty == null
-                || !sourceProperty.getType().equals(targetProperty.getType())) {
-            throw new RuntimeException("Specified property " + propertyName
-                    + " doesn't exist or is not consistent");
-        }
-
-        try {
-            Object sourceValue = sourceProperty.getGetterMethod()
-                                               .invoke(source);
-            if (sourceValue == null) {
-                return;
-            }
-            if (sourceProperty.getType().isPrimitive()) {
-                if (sourceValue instanceof Number) {
-                    if (((Number) sourceValue).doubleValue() == 0.0) {
-                        return;
-                    }
-                }
-                if (sourceValue instanceof Boolean) {
-                    if (((Boolean) sourceValue).equals(Boolean.FALSE)) {
-                        return;
-                    }
-                }
-            }
-            targetProperty.getSetterMethod().invoke(target, sourceValue);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/OutputFormatter.java b/s4-core/src/main/java/io/s4/processor/OutputFormatter.java
deleted file mode 100644
index e00d9bf..0000000
--- a/s4-core/src/main/java/io/s4/processor/OutputFormatter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-public interface OutputFormatter {
-    public Object format(Object outputValue);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/processor/OverloadDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/processor/OverloadDispatcher.java b/s4-core/src/main/java/io/s4/processor/OverloadDispatcher.java
deleted file mode 100644
index b102b34..0000000
--- a/s4-core/src/main/java/io/s4/processor/OverloadDispatcher.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.processor;
-
-public interface OverloadDispatcher {
-    public void dispatch(Object pe, Object event);
-}


Mime
View raw message