incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [30/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/util/ObjectBuilder.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/util/ObjectBuilder.java b/s4-core/src/main/java/io/s4/client/util/ObjectBuilder.java
deleted file mode 100644
index 96c90d1..0000000
--- a/s4-core/src/main/java/io/s4/client/util/ObjectBuilder.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client.util;
-
-import io.s4.message.Request;
-import io.s4.message.SinglePERequest;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.InstanceCreator;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import flexjson.JSONDeserializer;
-import flexjson.JSONSerializer;
-
-public class ObjectBuilder {
-
-    private ConcurrentHashMap<Class<?>, JSONDeserializer<Object>> deserializers = new ConcurrentHashMap<Class<?>, JSONDeserializer<Object>>();
-
-    private JSONSerializer serializer = (new JSONSerializer()).exclude("class");
-
-    // private JSONSerializer serializer = (new JSONSerializer());
-
-    public Object fromJson(String jevent, Class<?> clazz)
-            throws ObjectBuilder.Exception {
-
-        JSONDeserializer<Object> deser = deserializers.get(clazz);
-
-        if (deser == null) {
-            JSONDeserializer<Object> newDeser = new JSONDeserializer<Object>();
-            newDeser.use(null, clazz);
-
-            deser = deserializers.putIfAbsent(clazz, newDeser);
-
-            if (deser == null)
-                deser = newDeser;
-        }
-
-        return deser.deserialize(jevent);
-
-    }
-
-    public String toJson(Object e) {
-        return serializer.serialize(e);
-    }
-
-    public static class Exception extends java.lang.Exception {
-        public Exception(String message) {
-            super(message);
-        }
-
-        public Exception(String message, Throwable cause) {
-            super(message, cause);
-        }
-    }
-
-    private static class TEST {
-        private int a;
-        private int b;
-
-        public void setA(int a) {
-            this.a = a * 10;
-        }
-
-        public String toString() {
-            return "" + a + " " + b;
-        }
-
-        public int getA() {
-            return a;
-        }
-
-        public int getB() {
-            return b;
-        }
-
-        public TEST(int a, int b) {
-            this.a = a * 10;
-            this.b = b * 10;
-        }
-
-        public TEST() {
-        }
-    }
-
-    public static void main(String[] argv) throws Exception {
-
-        ObjectBuilder b = new ObjectBuilder();
-
-        String s = "{a:5, b:100}";
-        Object out = b.fromJson(s, TEST.class);
-
-        System.out.println(out.toString());
-
-        TEST t = new TEST(1, 2);
-
-        System.out.println(b.toJson(t));
-
-        String[] query = { "name", "count", "freq" };
-        String target[] = { "ACDW", "11" };
-
-        io.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
-        rinfo.setRequesterUUID(UUID.randomUUID());
-        Request req = new io.s4.message.SinglePERequest(Arrays.asList(target),
-                                                        Arrays.asList(query),
-                                                        rinfo);
-
-        System.out.println(req.toString());
-
-        InstanceCreator<io.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
-            public io.s4.message.Request.RInfo createInstance(Type type) {
-                return new io.s4.message.Request.ClientRInfo();
-            }
-        };
-
-        Gson gson = (new GsonBuilder()).registerTypeAdapter(io.s4.message.Request.RInfo.class,
-                                                            infoCreator)
-                                       .registerTypeAdapter(Object.class,
-                                                            new ObjectTypeAdapter())
-                                       .create();
-
-        System.out.println("gson: " + gson.toJson(req));
-        System.out.println("gson reversed: "
-                + gson.fromJson(gson.toJson(req), SinglePERequest.class));
-
-        System.out.println(b.toJson(req));
-        System.out.println(b.toJson(Arrays.asList(query)));
-
-        System.out.println("----------------------------------------------");
-
-        ArrayList<SSTest> list = new ArrayList<SSTest>();
-
-        SSTest ss1 = new SSTest();
-        ss1.str = "list-element-1";
-        SSTest ss2 = new SSTest();
-        ss2.str = "list-element-2";
-
-        list.add(ss1);
-        list.add(ss2);
-
-        Map<String, Object> listmap = new HashMap<String, Object>();
-        listmap.put("ll", list);
-
-        MapTest mt = new MapTest();
-        mt.map = listmap;
-
-        Object listmapobj = listmap;
-
-        System.out.println("list: " + gson.toJson(list));
-        System.out.println("listmap: " + gson.toJson(listmap));
-        System.out.println("listmapobj: " + gson.toJson(listmapobj));
-        System.out.println("mapobject: " + gson.toJson(mt));
-    }
-
-    private static class SSTest {
-        public String str;
-    }
-
-    private static class MapTest {
-        Map<String, Object> map;
-        Map gmap;
-    }
-
-    private static class ObjectTypeAdapter implements JsonSerializer<Object> {
-        public JsonElement serialize(Object src, Type typeOfSrc,
-                                     JsonSerializationContext context) {
-
-            if (src.getClass() != Object.class) {
-                return context.serialize(src, src.getClass());
-            }
-
-            return new JsonObject();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/collector/Event.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/collector/Event.java b/s4-core/src/main/java/io/s4/collector/Event.java
deleted file mode 100644
index b59fc6d..0000000
--- a/s4-core/src/main/java/io/s4/collector/Event.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.collector;
-
-import io.s4.dispatcher.Dispatcher;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
-import io.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class Event extends EventRecord {
-    private String eventName;
-    private long timestamp;
-    private List<CompoundKeyInfo> compoundKeys = new ArrayList<CompoundKeyInfo>();
-    private boolean debug = false;
-    public static final String EVENT_NAME_KEY = "S4__eventName";
-    public static final String TIMESTAMP_KEY = "S4__timestamp";
-
-    public void setDebug(boolean debug) {
-        this.debug = debug;
-    }
-
-    public Event(Map<String, Object> passedEventData) {
-        super(passedEventData);
-
-        eventName = this.get(EVENT_NAME_KEY, "unknown");
-        timestamp = this.get(TIMESTAMP_KEY, -1L);
-
-        List<EventRecord> plainCompoundKeyList = null;
-        if ((plainCompoundKeyList = get(Dispatcher.PARTITION_INFO_KEY,
-                                        EMPTY_LIST)) != EMPTY_LIST) {
-            for (EventRecord plainCompoundKeyInfo : plainCompoundKeyList) {
-                CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
-                compoundKeyInfo.setCompoundValue(plainCompoundKeyInfo.get("compoundValue",
-                                                                          (String) null));
-                compoundKeyInfo.setCompoundKey(plainCompoundKeyInfo.get("compoundKey",
-                                                                        (String) null));
-                compoundKeys.add(compoundKeyInfo);
-                for (EventRecord plainKeyInfo : plainCompoundKeyInfo.get("keyInfoList",
-                                                                         EMPTY_LIST)) {
-                    KeyInfo keyInfo = new KeyInfo();
-                    for (EventRecord plainKeyPathElement : plainKeyInfo.get("keyPathElementList",
-                                                                            EMPTY_LIST)) {
-                        String keyName = plainKeyPathElement.get("keyName",
-                                                                 (String) null);
-                        Integer index = plainKeyPathElement.get("index",
-                                                                (Integer) null);
-
-                        if (keyName != null) {
-                            keyInfo.addElementToPath(keyName);
-                        } else if (index != null) {
-                            keyInfo.addElementToPath(index);
-                        }
-                    }
-                    compoundKeyInfo.addKeyInfo(keyInfo);
-                }
-            }
-        }
-        if (debug) {
-            for (CompoundKeyInfo compoundKeyInfo : compoundKeys) {
-                System.out.println("CompoundKey: "
-                        + compoundKeyInfo.getCompoundValue());
-                for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
-                    String keyPath = "";
-                    for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
-                        if (keyPathElement instanceof KeyPathElementIndex) {
-                            keyPath += "["
-                                    + ((KeyPathElementIndex) keyPathElement).getIndex()
-                                    + "]";
-                        } else {
-                            if (keyPath.length() > 0) {
-                                keyPath += "/";
-                            }
-                            keyPath += ((KeyPathElementName) keyPathElement).getKeyName();
-                        }
-                    }
-                    System.out.println("   " + keyPath);
-                }
-            }
-        }
-    }
-
-    public List<CompoundKeyInfo> getCompoundKeys() {
-        return compoundKeys;
-    }
-
-    public String getEventName() {
-        return eventName;
-    }
-
-    public long getTimeStamp() {
-        return timestamp;
-    }
-
-    public List<Map<String, Object>> getCompoundKeyList() {
-        return get(Dispatcher.PARTITION_INFO_KEY,
-                   new ArrayList<Map<String, Object>>());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/collector/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/collector/EventListener.java b/s4-core/src/main/java/io/s4/collector/EventListener.java
deleted file mode 100644
index a35c94d..0000000
--- a/s4-core/src/main/java/io/s4/collector/EventListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.collector;
-
-import static io.s4.util.MetricsName.S4_CORE_METRICS;
-import static io.s4.util.MetricsName.S4_EVENT_METRICS;
-import static io.s4.util.MetricsName.generic_listener_msg_in_ct;
-import io.s4.listener.EventHandler;
-import io.s4.logger.Monitor;
-import io.s4.processor.AsynchronousEventProcessor;
-import io.s4.processor.PEContainer;
-
-import org.apache.log4j.Logger;
-
-public class EventListener implements EventHandler {
-    private static Logger logger = Logger.getLogger(EventListener.class);
-    private int eventCount = 0;
-    private AsynchronousEventProcessor eventProcessor;
-    private io.s4.listener.EventListener rawListener;
-    private Monitor monitor;
-
-    public void setMonitor(Monitor monitor) {
-        this.monitor = monitor;
-    }
-
-    public void setPeContainer(PEContainer peContainer) {
-        this.eventProcessor = peContainer;
-    }
-
-    public void setEventProcessor(AsynchronousEventProcessor eventProcessor) {
-        this.eventProcessor = eventProcessor;
-    }
-
-    public void setRawListener(io.s4.listener.EventListener rawListener) {
-        this.rawListener = rawListener;
-    }
-
-    public io.s4.listener.EventListener getRawListener() {
-        return this.rawListener;
-    }
-
-    public int getEventCount() {
-        return eventCount;
-    }
-
-    public EventListener() {
-
-    }
-
-    public void init() {
-        rawListener.addHandler(this);
-    }
-
-    public void processEvent(EventWrapper eventWrapper) {
-        try {
-            synchronized (this) {
-                eventCount++;
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("STEP 3 (EventListener): peContainer.addEvent - "
-                        + eventWrapper.getEvent().toString());
-            }
-            eventProcessor.queueWork(eventWrapper);
-
-            if (monitor != null) {
-                monitor.increment(generic_listener_msg_in_ct.toString(),
-                                  1,
-                                  S4_EVENT_METRICS.toString(),
-                                  "et",
-                                  eventWrapper.getStreamName());
-                monitor.increment(generic_listener_msg_in_ct.toString(),
-                                  1,
-                                  S4_CORE_METRICS.toString());
-            }
-        } catch (Exception e) {
-            logger.error("Exception in processEvent on thread "
-                    + Thread.currentThread().getId(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/collector/EventRecord.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/collector/EventRecord.java b/s4-core/src/main/java/io/s4/collector/EventRecord.java
deleted file mode 100644
index a8b787a..0000000
--- a/s4-core/src/main/java/io/s4/collector/EventRecord.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.collector;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class EventRecord implements Map<String, Object> {
-
-    public static EventRecord EMPTY_RECORD = new EventRecord(new HashMap<String, Object>());
-    public static List<EventRecord> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<EventRecord>());
-
-    private Map<String, Object> eventData = new HashMap<String, Object>();
-    private Map<String, Object> additionalData = new HashMap<String, Object>();
-
-    public EventRecord(Map<String, Object> passedEventData) {
-        this(passedEventData, true);
-    }
-
-    private EventRecord(Map<String, Object> passedEventData,
-            boolean processEventData) {
-        if (processEventData) {
-            eventData = processMap(passedEventData, true);
-        } else {
-            eventData = passedEventData;
-        }
-    }
-
-    private Map<String, Object> processMap(Map<String, Object> inputMap) {
-        return processMap(inputMap, false);
-    }
-
-    private Map<String, Object> processMap(Map<String, Object> inputMap, boolean returnRaw) {
-        Map<String, Object> eventData = new HashMap<String, Object>();
-        for (String key : inputMap.keySet()) {
-            Object value = inputMap.get(key);
-            if (value instanceof Map<?, ?>) {
-                eventData.put(key, processMap((Map<String, Object>) value));
-            } else if (value instanceof List<?>) {
-                eventData.put(key,
-                              processList((List<Map<String, Object>>) value));
-            } else {
-                eventData.put(key, value);
-            }
-        }
-        if (returnRaw)
-            return eventData;
-        return new EventRecord(eventData, false);
-    }
-
-    private List<Map<String, Object>> processList(List<Map<String, Object>> inputList) {
-        List<Map<String, Object>> eventList = new ArrayList<Map<String, Object>>();
-        for (Map<String, Object> inputMap : inputList) {
-            eventList.add(processMap(inputMap));
-        }
-        return Collections.unmodifiableList(eventList);
-    }
-
-    @Override
-    public void clear() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsKey(Object key) {
-        return eventData.containsKey(key);
-    }
-
-    @Override
-    public boolean containsValue(Object value) {
-        return eventData.containsValue(value);
-    }
-
-    @Override
-    public Set<java.util.Map.Entry<String, Object>> entrySet() {
-        return eventData.entrySet();
-    }
-
-    @Override
-    public Object get(Object key) {
-        return eventData.get(key);
-    }
-
-    public <T> T get(String key, T defaultValue) {
-        return get(key, defaultValue, eventData);
-    }
-
-    private <T> T get(String key, T defaultValue, Map<String, Object> map) {
-        Object value = map.get(key);
-        if (value == null) {
-            return defaultValue;
-        }
-        return (T) value;
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return eventData.isEmpty();
-    }
-
-    @Override
-    public Set<String> keySet() {
-        return eventData.keySet();
-    }
-
-    @Override
-    public Object put(String key, Object value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void putAll(Map<? extends String, ? extends Object> m) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object remove(Object key) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int size() {
-        return eventData.size();
-    }
-
-    @Override
-    public Collection<Object> values() {
-        return eventData.values();
-    }
-
-    public void setAdditionalProperty(String key, Object value) {
-        additionalData.put(key, value);
-    }
-
-    public <T> T getAdditionalProperty(String key, T defaultValue) {
-        return get(key, defaultValue, additionalData);
-    }
-
-    public void removeAdditionalProperty(String key) {
-        additionalData.remove(key);
-    }
-
-    public Map<String, Object> getMutableMap() {
-        return getMutableMap(this.eventData);
-    }
-
-    public Map<String, Object> getMutableMap(Map<String, Object> recordData) {
-        Map<String, Object> mutableData = new HashMap<String, Object>();
-        for (String key : recordData.keySet()) {
-            Object value = recordData.get(key);
-            if (value instanceof Map<?, ?>) {
-                mutableData.put(key, getMutableMap((Map<String, Object>) value));
-            } else if (value instanceof List<?>) {
-                mutableData.put(key,
-                                getMutableList((List<Map<String, Object>>) value));
-            } else {
-                mutableData.put(key, value);
-            }
-        }
-        return mutableData;
-    }
-
-    public List<Map<String, Object>> getMutableList(List<Map<String, Object>> recordList) {
-        List<Map<String, Object>> mutableList = new ArrayList<Map<String, Object>>();
-        for (Map<String, Object> recordData : recordList) {
-            mutableList.add(getMutableMap(recordData));
-        }
-        return mutableList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/collector/EventWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/collector/EventWrapper.java b/s4-core/src/main/java/io/s4/collector/EventWrapper.java
deleted file mode 100644
index d58f0f7..0000000
--- a/s4-core/src/main/java/io/s4/collector/EventWrapper.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.collector;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.StringTokenizer;
-
-public class EventWrapper {
-    private List<CompoundKeyInfo> compoundKeys = null;
-    private List<List<String>> compoundKeyNames = null;
-    private Object event;
-    private String streamName;
-
-    public List<CompoundKeyInfo> getCompoundKeys() {
-        return compoundKeys;
-    }
-
-    public Object getEvent() {
-        return event;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public List<List<String>> getCompoundKeyNames() {
-        return compoundKeyNames;
-    }
-
-    public EventWrapper() {
-        compoundKeys = new ArrayList<CompoundKeyInfo>();
-    }
-
-    public EventWrapper(String streamName, Object event,
-            List<CompoundKeyInfo> compoundKeys) {
-        this.streamName = streamName;
-        this.event = event;
-        this.compoundKeys = compoundKeys;
-    }
-
-    public EventWrapper(String streamName, String[] compoundKeyStrings,
-            Object event) {
-        this.streamName = streamName;
-        this.event = event;
-
-        if (compoundKeyStrings != null) {
-            this.compoundKeyNames = new ArrayList<List<String>>(compoundKeyStrings.length);
-
-            for (String keyAsString : compoundKeyStrings) {
-                List<String> keyNameElements = new ArrayList<String>();
-                StringTokenizer st = new StringTokenizer(keyAsString, "/");
-                while (st.hasMoreTokens()) {
-                    keyNameElements.add(st.nextToken());
-                }
-                compoundKeyNames.add(keyNameElements);
-            }
-        }
-    }
-
-    public String toString() {
-        return "stream:" + getStreamName() + " keys:" + getCompoundKeys()
-                + " keyNames:" + getCompoundKeyNames() + " event:" + getEvent();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/Dispatcher.java b/s4-core/src/main/java/io/s4/dispatcher/Dispatcher.java
deleted file mode 100644
index d46fc13..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/Dispatcher.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.dispatcher.partitioner.Partitioner;
-import io.s4.dispatcher.partitioner.VariableKeyPartitioner;
-import io.s4.dispatcher.transformer.Transformer;
-import io.s4.emitter.EventEmitter;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class Dispatcher implements EventDispatcher {
-    private EventEmitter eventEmitter;
-    private Transformer[] transformers = new Transformer[0];
-    private Partitioner[] partitioners = new Partitioner[0];
-    private String configFilename;
-    private boolean debug = false;
-    private String loggerName = "s4";
-
-    public final static String PARTITION_INFO_KEY = "S4__PartitionInfo";
-
-    public void setTransformers(Transformer[] transformers) {
-        this.transformers = transformers;
-    }
-
-    public void setPartitioners(Partitioner[] partitioners) {
-        this.partitioners = partitioners;
-    }
-
-    public void setEventEmitter(EventEmitter eventEmitter) {
-        this.eventEmitter = eventEmitter;
-    }
-
-    public EventEmitter getEventEmitter() {
-        return this.eventEmitter;
-    }
-
-    public void setConfigFilename(String configFilename) {
-        this.configFilename = configFilename;
-    }
-
-    public void setDebug(boolean debug) {
-        this.debug = debug;
-    }
-
-    public void setLoggerName(String loggerName) {
-        this.loggerName = loggerName;
-    }
-
-    private volatile int eventCount = 0;
-    private volatile int rawEventCount = 0;
-
-    public Dispatcher() {
-
-    }
-
-    int counts[];
-
-    public void init() {
-
-        Runnable r = new Runnable() {
-            private long configFileTime = -1;
-
-            public void run() {
-                long lastCheckTime = System.currentTimeMillis();
-                int lastEventCount = eventCount;
-                int lastRawEventCount = rawEventCount;
-                while (!Thread.currentThread().isInterrupted()) {
-                    int eventCount = Dispatcher.this.eventCount;
-                    long currentTime = System.currentTimeMillis();
-                    double rate = (eventCount - lastEventCount)
-                            / ((currentTime - lastCheckTime) / 1000.0);
-                    double rawRate = (rawEventCount - lastRawEventCount)
-                            / ((currentTime - lastCheckTime) / 1000.0);
-                    lastCheckTime = currentTime;
-                    lastEventCount = eventCount;
-                    lastRawEventCount = rawEventCount;
-                    Logger.getLogger(loggerName).info("Event count is "
-                            + eventCount + "; rate " + rate);
-                    Logger.getLogger(loggerName).info("Raw event count is "
-                            + rawEventCount + "; rate " + rawRate);
-                    if (counts != null) {
-                        for (int i = 0; i < counts.length; i++) {
-                            Logger.getLogger(loggerName).info(i + ": "
-                                    + counts[i]);
-                        }
-                    }
-
-                    configCheck();
-
-                    try {
-                        Thread.sleep(15000);
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-
-            }
-
-            private void configCheck() {
-                if (configFilename == null) {
-                    return;
-                }
-
-                File file = new File(configFilename);
-                if (!file.exists()) {
-                    return;
-                }
-                long lastModified = file.lastModified();
-                if (configFileTime == -1) {
-                    configFileTime = lastModified;
-                    return;
-                }
-
-                if (lastModified > configFileTime) {
-                    Logger.getLogger(loggerName)
-                          .info("Config file has changed. Exiting!!");
-                    System.exit(4);
-                }
-            }
-        };
-        Thread t = new Thread(r);
-        t.start();
-    }
-
-    @Override
-    public void dispatchEvent(String streamName,
-                              List<List<String>> compoundKeyNames, Object event) {
-        dispatchEvent(streamName, event, true, compoundKeyNames);
-    }
-
-    @Override
-    public void dispatchEvent(String streamName, Object event) {
-        dispatchEvent(streamName, event, false, null);
-    }
-
-    private void dispatchEvent(String streamName, Object event,
-                               boolean variableKey,
-                               List<List<String>> compoundKeyNames) {
-        synchronized (this) {
-            rawEventCount++;
-        }
-        if (eventEmitter.getNodeCount() <= 0) {
-            return;
-        } else {
-            if (counts == null) {
-                counts = new int[eventEmitter.getNodeCount()];
-            }
-        }
-
-        try {
-            synchronized (this) {
-                eventCount++;
-            }
-
-            List<CompoundKeyInfo> partionInfoList = new ArrayList<CompoundKeyInfo>();
-            for (Partitioner partitioner : partitioners) {
-                List<CompoundKeyInfo> pInfoList = null;
-
-                if (!variableKey) {
-                    pInfoList = partitioner.partition(streamName,
-                                                      event,
-                                                      eventEmitter.getNodeCount());
-                } else {
-                    if (partitioner instanceof VariableKeyPartitioner) {
-                        VariableKeyPartitioner vp = (VariableKeyPartitioner) partitioner;
-                        pInfoList = vp.partition(streamName,
-                                                 compoundKeyNames,
-                                                 event,
-                                                 eventEmitter.getNodeCount());
-                    }
-                }
-
-                if (pInfoList != null) {
-                    partionInfoList.addAll(pInfoList);
-                }
-            }
-
-            Map<Integer, List<CompoundKeyInfo>> pInfoMap = new HashMap<Integer, List<CompoundKeyInfo>>();
-            for (CompoundKeyInfo partitionInfo : partionInfoList) {
-                int partitionId = partitionInfo.getPartitionId();
-                List<CompoundKeyInfo> listByPartitionNumber = pInfoMap.get(partitionId);
-                if (listByPartitionNumber == null) {
-                    listByPartitionNumber = new ArrayList<CompoundKeyInfo>();
-                    pInfoMap.put(partitionId, listByPartitionNumber);
-                }
-                listByPartitionNumber.add(partitionInfo);
-            }
-
-            for (int partitionId : pInfoMap.keySet()) {
-                EventWrapper eventWrapper = new EventWrapper(streamName,
-                                                             event,
-                                                             pInfoMap.get(partitionId));
-                counts[partitionId]++;
-                eventEmitter.emit(partitionId, eventWrapper);
-            }
-        } catch (Exception e) {
-            Logger.getLogger(loggerName)
-                  .error("Exception in processEvent on thread "
-                                 + Thread.currentThread().getId() + " at time "
-                                 + System.currentTimeMillis(),
-                         e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/EventDispatcher.java b/s4-core/src/main/java/io/s4/dispatcher/EventDispatcher.java
deleted file mode 100644
index fe8b7ce..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/EventDispatcher.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher;
-
-import java.util.List;
-
-public interface EventDispatcher {
-
-    /**
-     * Dispatch event using stream name. Partitioners may be used to partition
-     * the event, possibly based on a pre-determined set of fixed named keys.
-     * 
-     * @param streamName
-     *            name of stream to dispatch on
-     * @param event
-     *            object to dispatch
-     */
-    void dispatchEvent(String streamName, Object event);
-
-    /**
-     * Dispatch event using a stream name and using a set of named keys.
-     * VariableKeyPartitioners may be used to partition the event.
-     * 
-     * @param streamName
-     *            name of stream to dispatch on
-     * @param compoundKeyNames
-     *            keys to use for dispatching
-     * @param event
-     *            object to dispatch
-     */
-    void dispatchEvent(String streamName, List<List<String>> compoundKeyNames,
-                       Object event);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/MultiDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/MultiDispatcher.java b/s4-core/src/main/java/io/s4/dispatcher/MultiDispatcher.java
deleted file mode 100644
index 660b46b..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/MultiDispatcher.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher;
-
-import java.util.List;
-
-/**
- * Dispatcher that sends events through multiple abstract dispatchers.
- */
-public class MultiDispatcher implements EventDispatcher {
-
-    private EventDispatcher[] dispatchers = null;
-
-    public void setDispatchers(EventDispatcher[] dispatchers) {
-        this.dispatchers = dispatchers;
-    }
-
-    @Override
-    public void dispatchEvent(String streamName, Object event) {
-        if (dispatchers != null) {
-            for (EventDispatcher dispatcher : dispatchers) {
-                dispatcher.dispatchEvent(streamName, event);
-            }
-        }
-    }
-
-    @Override
-    public void dispatchEvent(String streamName,
-                              List<List<String>> compoundKeyNames, Object event) {
-        if (dispatchers != null) {
-            for (EventDispatcher dispatcher : dispatchers) {
-                dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/StreamExcludingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/StreamExcludingDispatcher.java b/s4-core/src/main/java/io/s4/dispatcher/StreamExcludingDispatcher.java
deleted file mode 100644
index 890d25e..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/StreamExcludingDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-public class StreamExcludingDispatcher implements EventDispatcher {
-
-    private EventDispatcher dispatcher = null;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    private HashSet<String> streams = null;
-
-    public void setStreams(String[] streams) {
-        this.streams = new HashSet<String>(Arrays.asList(streams));
-    }
-
-    @Override
-    public void dispatchEvent(String streamName, Object event) {
-        if (dispatcher != null
-                && (streams == null || !streams.contains(streamName))) {
-            dispatcher.dispatchEvent(streamName, event);
-        }
-    }
-
-    @Override
-    public void dispatchEvent(String streamName,
-                              List<List<String>> compoundKeyNames, Object event) {
-        if (dispatcher != null
-                && (streams == null || !streams.contains(streamName))) {
-            dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/StreamSelectingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/StreamSelectingDispatcher.java b/s4-core/src/main/java/io/s4/dispatcher/StreamSelectingDispatcher.java
deleted file mode 100644
index 36fd0a5..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/StreamSelectingDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-public class StreamSelectingDispatcher implements EventDispatcher {
-
-    private EventDispatcher dispatcher = null;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    private HashSet<String> streams = null;
-
-    public void setStreams(String[] streams) {
-        this.streams = new HashSet<String>(Arrays.asList(streams));
-    }
-
-    @Override
-    public void dispatchEvent(String streamName, Object event) {
-        if (dispatcher != null && streams != null
-                && streams.contains(streamName)) {
-            dispatcher.dispatchEvent(streamName, event);
-        }
-    }
-
-    @Override
-    public void dispatchEvent(String streamName,
-                              List<List<String>> compoundKeyNames, Object event) {
-        if (dispatcher != null && streams != null
-                && streams.contains(streamName)) {
-            dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/BroadcastPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/BroadcastPartitioner.java
deleted file mode 100644
index 2732965..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/BroadcastPartitioner.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Partition events to be sent to all parts. This can be used in conjunction
- * with a Dispatcher to broadcast events.
- */
-public class BroadcastPartitioner implements Partitioner {
-    @Override
-    public List<CompoundKeyInfo> partition(String streamName, Object event,
-                                           int partitionCount) {
-
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-
-        // assign to all partitions
-        for (int i = 0; i < partitionCount; ++i) {
-            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-
-            partitionInfo.setPartitionId(i);
-
-            partitionInfoList.add(partitionInfo);
-        }
-
-        return partitionInfoList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/CompoundKeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/CompoundKeyInfo.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/CompoundKeyInfo.java
deleted file mode 100644
index ba25d1a..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/CompoundKeyInfo.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class CompoundKeyInfo implements Serializable {
-    List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
-    int partitionId = -1;
-    String compoundValue;
-    String compoundKey;
-
-    public CompoundKeyInfo() {
-    }
-
-    public void addKeyInfo(KeyInfo keyInfo) {
-        keyInfoList.add(keyInfo);
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public void setCompoundKey(String compoundKey) {
-        this.compoundKey = compoundKey;
-    }
-
-    public void setCompoundValue(String compoundValue) {
-        this.compoundValue = compoundValue;
-    }
-
-    public List<KeyInfo> getKeyInfoList() {
-        return keyInfoList;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public String getCompoundKey() {
-        if (compoundKey == null) {
-            StringBuffer compoundKeyBuffer = new StringBuffer();
-            for (KeyInfo keyInfo : this.getKeyInfoList()) {
-                if (compoundKeyBuffer.length() > 0) {
-                    compoundKeyBuffer.append(",");
-                }
-                compoundKeyBuffer.append(keyInfo.toString());
-            }
-            compoundKey = compoundKeyBuffer.toString();
-        }
-        return compoundKey;
-    }
-
-    public String getCompoundValue() {
-        return this.compoundValue;
-    }
-
-    public String toString() {
-        return "{" + getCompoundKey() + " = " + getCompoundValue() + "}:"
-                + getPartitionId();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultHasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultHasher.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultHasher.java
deleted file mode 100644
index 3f053b8..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultHasher.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-public class DefaultHasher implements Hasher {
-
-    HashAlgorithm hashAlgorithm = HashAlgorithm.FNV1_64_HASH;
-
-    @Override
-    public long hash(Object hashKey) {
-        return hashAlgorithm.hash(String.valueOf(hashKey));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultPartitioner.java
deleted file mode 100644
index d393cc8..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DefaultPartitioner.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import io.s4.schema.Schema;
-import io.s4.schema.Schema.Property;
-import io.s4.schema.SchemaContainer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-public class DefaultPartitioner implements Partitioner, VariableKeyPartitioner {
-    private List<List<String>> keyNameTuple = new ArrayList<List<String>>();
-    private boolean debug = false;
-    private Hasher hasher;
-    private Set<String> streamNameSet;
-    private String delimiter = ":";
-    private boolean fastPath = false;
-
-    public void setDelimiter(String delimiter) {
-        this.delimiter = delimiter;
-    }
-
-    public void setHashKey(String[] simpleKeyStrings) {
-        for (String simpleKeyAsString : simpleKeyStrings) {
-            List<String> keyNameElements = new ArrayList<String>();
-            StringTokenizer st = new StringTokenizer(simpleKeyAsString, "/");
-            while (st.hasMoreTokens()) {
-                keyNameElements.add(st.nextToken());
-            }
-            keyNameTuple.add(keyNameElements);
-        }
-    }
-
-    public void setStreamNames(String[] streamNames) {
-        streamNameSet = new HashSet<String>(streamNames.length);
-        for (String eventType : streamNames) {
-            streamNameSet.add(eventType);
-        }
-    }
-
-    public void setHasher(Hasher hasher) {
-        this.hasher = hasher;
-    }
-
-    public void setDebug(boolean debug) {
-        this.debug = debug;
-    }
-
-    private SchemaContainer schemaContainer = new SchemaContainer();
-
-    public List<CompoundKeyInfo> partition(String streamName, Object event,
-                                           int partitionCount) {
-        return partition(streamName, keyNameTuple, event, partitionCount);
-    }
-
-    public List<CompoundKeyInfo> partition(String streamName,
-                                           List<List<String>> compoundKeyNames,
-                                           Object event, int partitionCount) {
-
-        if (streamName != null && streamNameSet != null
-                && !streamNameSet.contains(streamName)) {
-            return null;
-        }
-
-        // Some event types that need special handling
-        if (event instanceof io.s4.message.Request) {
-            // construct key from request's target
-            io.s4.message.Request r = (io.s4.message.Request) event;
-            return r.partition(hasher, delimiter, partitionCount);
-
-        } else if (event instanceof io.s4.message.Response) {
-            // partition id is encoded in Response, so use it directly.
-            io.s4.message.Response r = (io.s4.message.Response) event;
-            return r.partition(partitionCount);
-
-        } else if (compoundKeyNames == null) {
-            // if compoundKeyNames is null, then assign to a random partition.
-            return partitionRandom(partitionCount);
-        }
-
-        // have to compute key value and
-        // partition based on hash of that value
-
-        Schema schema = schemaContainer.getSchema(event.getClass());
-
-        if (debug) {
-            System.out.println(schema);
-        }
-
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-
-        // fast path for single top-level key
-        if (fastPath
-                || (compoundKeyNames.size() == 1 && compoundKeyNames.get(0)
-                                                                    .size() == 1)) {
-            String simpleKeyName = compoundKeyNames.get(0).get(0);
-            if (debug) {
-                System.out.println("Using fast path!");
-            }
-            fastPath = true;
-            KeyInfo keyInfo = new KeyInfo();
-            Property property = schema.getProperties().get(simpleKeyName);
-            if (property == null) {
-                return null;
-            }
-
-            Object value = null;
-            try {
-                value = property.getGetterMethod().invoke(event);
-            } catch (Exception e) {
-                if (debug) {
-                    e.printStackTrace();
-                }
-            }
-
-            if (value == null) {
-                if (debug) {
-                    System.out.println("Fast path: Null value encountered");
-                }
-                return null;
-            }
-            keyInfo.addElementToPath(simpleKeyName);
-            String stringValue = String.valueOf(value);
-            keyInfo.setValue(stringValue);
-            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-            partitionInfo.addKeyInfo(keyInfo);
-            int partitionId = (int) (hasher.hash(stringValue) % partitionCount);
-            partitionInfo.setPartitionId(partitionId);
-            partitionInfo.setCompoundValue(stringValue);
-            partitionInfoList.add(partitionInfo);
-            if (debug) {
-                System.out.printf("Value %s, partition id %d\n",
-                                  stringValue,
-                                  partitionInfo.getPartitionId());
-            }
-            return partitionInfoList;
-        }
-
-        List<List<KeyInfo>> valueLists = new ArrayList<List<KeyInfo>>();
-        int maxSize = 0;
-
-        for (List<String> simpleKeyPath : compoundKeyNames) {
-            List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
-            KeyInfo keyInfo = new KeyInfo();
-            keyInfoList = getKeyValues(event,
-                                       schema,
-                                       simpleKeyPath,
-                                       0,
-                                       keyInfoList,
-                                       keyInfo);
-            if (keyInfoList == null || keyInfoList.size() == 0) {
-                if (debug) {
-                    System.out.println("Null value encountered");
-                }
-                return null; // do no partitioning if any simple key's value
-                             // resolves to null
-            }
-            valueLists.add(keyInfoList);
-            maxSize = Math.max(maxSize, keyInfoList.size());
-
-            if (debug) {
-                printKeyInfoList(keyInfoList);
-            }
-        }
-
-        for (int i = 0; i < maxSize; i++) {
-            String compoundValue = "";
-            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-            for (List<KeyInfo> keyInfoList : valueLists) {
-                if (i < keyInfoList.size()) {
-                    compoundValue += (compoundValue.length() > 0 ? delimiter
-                            : "") + keyInfoList.get(i).getValue();
-                    partitionInfo.addKeyInfo(keyInfoList.get(i));
-                } else {
-                    compoundValue += (compoundValue.length() > 0 ? delimiter
-                            : "")
-                            + keyInfoList.get(keyInfoList.size() - 1)
-                                         .getValue();
-                    partitionInfo.addKeyInfo(keyInfoList.get(keyInfoList.size() - 1));
-                }
-            }
-
-            // get the partition id
-            int partitionId = (int) (hasher.hash(compoundValue) % partitionCount);
-            partitionInfo.setPartitionId(partitionId);
-            partitionInfo.setCompoundValue(compoundValue);
-            partitionInfoList.add(partitionInfo);
-            if (debug) {
-                System.out.printf("Value %s, partition id %d\n",
-                                  compoundValue,
-                                  partitionInfo.getPartitionId());
-            }
-        }
-
-        return partitionInfoList;
-    }
-
-    // Assign to random partition
-    private List<CompoundKeyInfo> partitionRandom(int partitionCount) {
-        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-
-        // choose a random int from [0, partitionCount-1]
-        int partitionId = (int) Math.min(partitionCount - 1,
-                                         Math.floor(Math.random()
-                                                 * partitionCount));
-
-        partitionInfo.setPartitionId(partitionId);
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-        partitionInfoList.add(partitionInfo);
-
-        return partitionInfoList;
-    }
-
-    private void printKeyInfoList(List<KeyInfo> keyInfoList) {
-        for (KeyInfo aKeyInfo : keyInfoList) {
-            System.out.printf("Path: %s; full path %s; value %s\n",
-                              aKeyInfo.toString(),
-                              aKeyInfo.toString(true),
-                              aKeyInfo.getValue());
-        }
-    }
-
-    private List<KeyInfo> getKeyValues(Object record, Schema schema,
-                                       List<String> keyNameElements,
-                                       int elementIndex,
-                                       List<KeyInfo> keyInfoList,
-                                       KeyInfo keyInfo) {
-        String keyElement = keyNameElements.get(elementIndex);
-        Property property = schema.getProperties().get(keyElement);
-        if (property == null) {
-            return null;
-        }
-
-        keyInfo.addElementToPath(keyElement);
-
-        Object value = null;
-        try {
-            value = property.getGetterMethod().invoke(record);
-        } catch (Exception e) {
-            if (debug) {
-                System.out.println("key element is " + keyElement);
-                e.printStackTrace();
-            }
-        }
-
-        if (value == null) {
-            return null; // return a null KeyInfo list if we hit a null value
-        }
-        if (property.isList()) {
-            List list = (List) value;
-            // TODO: handle case where key does not include property of
-            // component type
-            Schema componentSchema = property.getComponentProperty()
-                                             .getSchema();
-            int listLength = list.size();
-            for (int i = 0; i < listLength; i++) {
-                Object listEntry = list.get(i);
-                KeyInfo keyInfoForListEntry = keyInfo.copy();
-                keyInfoForListEntry.addElementToPath(i);
-                Object partialList = getKeyValues(listEntry,
-                                                  componentSchema,
-                                                  keyNameElements,
-                                                  elementIndex + 1,
-                                                  keyInfoList,
-                                                  keyInfoForListEntry);
-                if (partialList == null) {
-                    return null;
-                }
-            }
-        } else if (property.getSchema() != null) {
-            return getKeyValues(value,
-                                property.getSchema(),
-                                keyNameElements,
-                                elementIndex + 1,
-                                keyInfoList,
-                                keyInfo);
-        } else {
-            keyInfo.setValue(String.valueOf(value));
-            keyInfoList.add(keyInfo);
-        }
-
-        return keyInfoList;
-    }
-
-    public static void main(String args[]) {
-        DefaultPartitioner dp1 = new DefaultPartitioner();
-        DefaultPartitioner dp2 = new DefaultPartitioner();
-        dp1.setDebug(true);
-        dp1.setHashKey(new String[] { "array1/val1", "array1/val2", "query" });
-        dp1.setHasher(new DefaultHasher());
-
-        dp2.setDebug(true);
-        dp2.setHashKey(new String[] { "user" });
-        dp2.setHasher(new DefaultHasher());
-
-        Map<String, Object> event = new HashMap<String, Object>();
-        event.put("user", "fred");
-        event.put("query", "timex watch");
-        List<Map<String, Object>> array1 = new ArrayList<Map<String, Object>>();
-        Map<String, Object> element = new HashMap<String, Object>();
-        element.put("val1", new Long(72));
-        element.put("val2", new Long(11));
-        array1.add(element);
-        element = new HashMap<String, Object>();
-        element.put("val1", new Long(21));
-        element.put("val2", new Long(12));
-        array1.add(element);
-        event.put("array1", array1);
-
-        dp1.partition("test", event, 4);
-        System.out.println("------------");
-        dp2.partition("test", event, 4);
-        System.out.println("------------");
-        event = new HashMap<String, Object>();
-
-        event.put("query", "timex watch");
-        array1 = new ArrayList<Map<String, Object>>();
-        element = new HashMap<String, Object>();
-        element.put("val1", new Long(72));
-        element.put("val2", new Long(11));
-        array1.add(element);
-        element = new HashMap<String, Object>();
-
-        element.put("val2", new Long(12));
-        array1.add(element);
-        event.put("array1", array1);
-
-        dp1.partition("test", event, 4);
-        System.out.println("------------");
-        dp2.partition("test", event, 4);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/DummyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DummyPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/DummyPartitioner.java
deleted file mode 100644
index 6036dc3..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/DummyPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DummyPartitioner implements Partitioner {
-
-    @Override
-    public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount) {
-        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-        partitionInfo.setPartitionId(0);
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-        partitionInfoList.add(partitionInfo);
-
-        return partitionInfoList;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/HashAlgorithm.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/HashAlgorithm.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/HashAlgorithm.java
deleted file mode 100644
index 05fd809..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/HashAlgorithm.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import io.s4.util.KeyUtil;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.zip.CRC32;
-
-/** from 
- * http://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/HashAlgorithm.java
- */
-
-/**
- * Known hashing algorithms for locating a server for a key. Note that all hash
- * algorithms return 64-bits of hash, but only the lower 32-bits are
- * significant. This allows a positive 32-bit number to be returned for all
- * cases.
- */
-public enum HashAlgorithm {
-
-    /**
-     * Native hash (String.hashCode()).
-     */
-    NATIVE_HASH,
-    /**
-     * CRC32_HASH as used by the perl API. This will be more consistent both
-     * across multiple API users as well as java versions, but is mostly likely
-     * significantly slower.
-     */
-    CRC32_HASH,
-    /**
-     * FNV hashes are designed to be fast while maintaining a low collision
-     * rate. The FNV speed allows one to quickly hash lots of data while
-     * maintaining a reasonable collision rate.
-     * 
-     * @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv
-     *      comparisons</a>
-     * @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at
-     *      wikipedia</a>
-     */
-    FNV1_64_HASH,
-    /**
-     * Variation of FNV.
-     */
-    FNV1A_64_HASH,
-    /**
-     * 32-bit FNV1.
-     */
-    FNV1_32_HASH,
-    /**
-     * 32-bit FNV1a.
-     */
-    FNV1A_32_HASH,
-    /**
-     * MD5-based hash algorithm used by ketama.
-     */
-    KETAMA_HASH;
-
-    private static final long FNV_64_INIT = 0xcbf29ce484222325L;
-    private static final long FNV_64_PRIME = 0x100000001b3L;
-
-    private static final long FNV_32_INIT = 2166136261L;
-    private static final long FNV_32_PRIME = 16777619;
-
-    /**
-     * Compute the hash for the given key.
-     * 
-     * @return a positive integer hash
-     */
-    public long hash(final String k) {
-        long rv = 0;
-        switch (this) {
-        case NATIVE_HASH:
-            rv = k.hashCode();
-            break;
-        case CRC32_HASH:
-            // return (crc32(shift) >> 16) & 0x7fff;
-            CRC32 crc32 = new CRC32();
-            crc32.update(KeyUtil.getKeyBytes(k));
-            rv = (crc32.getValue() >> 16) & 0x7fff;
-            break;
-        case FNV1_64_HASH: {
-            // Thanks to pierre@demartines.com for the pointer
-            rv = FNV_64_INIT;
-            int len = k.length();
-            for (int i = 0; i < len; i++) {
-                rv *= FNV_64_PRIME;
-                rv ^= k.charAt(i);
-            }
-        }
-            break;
-        case FNV1A_64_HASH: {
-            rv = FNV_64_INIT;
-            int len = k.length();
-            for (int i = 0; i < len; i++) {
-                rv ^= k.charAt(i);
-                rv *= FNV_64_PRIME;
-            }
-        }
-            break;
-        case FNV1_32_HASH: {
-            rv = FNV_32_INIT;
-            int len = k.length();
-            for (int i = 0; i < len; i++) {
-                rv *= FNV_32_PRIME;
-                rv ^= k.charAt(i);
-            }
-        }
-            break;
-        case FNV1A_32_HASH: {
-            rv = FNV_32_INIT;
-            int len = k.length();
-            for (int i = 0; i < len; i++) {
-                rv ^= k.charAt(i);
-                rv *= FNV_32_PRIME;
-            }
-        }
-            break;
-        case KETAMA_HASH:
-            byte[] bKey = computeMd5(k);
-            rv = ((long) (bKey[3] & 0xFF) << 24)
-                    | ((long) (bKey[2] & 0xFF) << 16)
-                    | ((long) (bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
-            break;
-        default:
-            assert false;
-        }
-        return rv & 0xffffffffL; /* Truncate to 32-bits */
-    }
-
-    /**
-     * Get the md5 of the given key.
-     */
-    public static byte[] computeMd5(String k) {
-        MessageDigest md5;
-        try {
-            md5 = MessageDigest.getInstance("MD5");
-        } catch (NoSuchAlgorithmException e) {
-            throw new RuntimeException("MD5 not supported", e);
-        }
-        md5.reset();
-        md5.update(KeyUtil.getKeyBytes(k));
-        return md5.digest();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/Hasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/Hasher.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/Hasher.java
deleted file mode 100644
index 6cf65b7..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/Hasher.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-public interface Hasher {
-    public long hash(Object hashKey);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/KeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/KeyInfo.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/KeyInfo.java
deleted file mode 100644
index 03e37fe..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/KeyInfo.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class KeyInfo implements Serializable {
-    List<KeyPathElement> keyPath = new ArrayList<KeyPathElement>();
-    String value;
-
-    public void setValue(String value) {
-        this.value = value;
-    }
-
-    public String getValue() {
-        return this.value;
-    }
-
-    public void addElementToPath(String keyName) {
-        keyPath.add(new KeyPathElementName(keyName));
-    }
-
-    public void addElementToPath(int index) {
-        keyPath.add(new KeyPathElementIndex(index));
-    }
-
-    private void addElementToPath(KeyPathElement keyPathElement) {
-        keyPath.add(keyPathElement);
-    }
-
-    public List<KeyPathElement> getKeyPath() {
-        return keyPath;
-    }
-
-    public static class KeyPathElement implements Serializable {
-        public enum PathElementType {
-            KEY_NAME, INDEX
-        }
-
-        PathElementType pathElementType;
-
-        public PathElementType getPathElementType() {
-            return pathElementType;
-        }
-
-    }
-
-    public static class KeyPathElementName extends KeyPathElement {
-        String keyName;
-
-        public KeyPathElementName() {
-
-        }
-
-        public KeyPathElementName(String keyName) {
-            pathElementType = PathElementType.KEY_NAME;
-            this.keyName = keyName;
-        }
-
-        public String getKeyName() {
-            return keyName;
-        }
-    }
-
-    public static class KeyPathElementIndex extends KeyPathElement {
-        int index;
-
-        public KeyPathElementIndex() {
-
-        }
-
-        public KeyPathElementIndex(int index) {
-            pathElementType = PathElementType.INDEX;
-            this.index = index;
-        }
-
-        public int getIndex() {
-            return index;
-        }
-    }
-
-    public KeyInfo copy() {
-        KeyInfo newKeyInfo = new KeyInfo();
-        for (KeyPathElement element : keyPath) {
-            newKeyInfo.addElementToPath(element);
-        }
-        return newKeyInfo;
-    }
-
-    public String toString() {
-        return toString(false);
-    }
-
-    public String toString(boolean showFull) {
-        StringBuffer sb = new StringBuffer();
-        for (KeyPathElement element : keyPath) {
-            if (element.getPathElementType() == KeyPathElement.PathElementType.KEY_NAME) {
-                if (sb.length() > 0) {
-                    sb.append("/");
-                }
-                sb.append(((KeyPathElementName) element).getKeyName());
-            } else if (showFull) {
-                sb.append("[")
-                  .append(((KeyPathElementIndex) element).getIndex())
-                  .append("]");
-            }
-        }
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/LoopbackPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/LoopbackPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/LoopbackPartitioner.java
deleted file mode 100644
index cdb6b15..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/LoopbackPartitioner.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import io.s4.emitter.CommLayerEmitter;
-import io.s4.emitter.EventEmitter;
-import io.s4.listener.EventListener;
-import io.s4.processor.PEContainer;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A partitioner that assigns events to the current partition, as given by the comm layer.
- * 
- */
-public class LoopbackPartitioner implements Partitioner, VariableKeyPartitioner {
-
-    CommLayerEmitter emitter;
-
-    @Override
-    public List<CompoundKeyInfo> partition(String streamName,
-            List<List<String>> compoundKeyNames, Object event,
-            int partitionCount) {
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-        StringBuilder compoundKeyBuilder = new StringBuilder();
-        // This partitioning ignores the values of the keyed attributes;
-        // it partitions to the current partition id of the pe container
-        partitionInfo.setPartitionId(emitter.getListener().getId());
-        for (List<String> keyNames : compoundKeyNames) {
-            for (String keyName : keyNames) {
-                compoundKeyBuilder.append(keyName);
-            }
-        }
-        partitionInfo.setCompoundKey(compoundKeyBuilder.toString());
-        partitionInfoList.add(partitionInfo);
-        return partitionInfoList;
-    }
-
-    @Override
-    public List<CompoundKeyInfo> partition(String streamName, Object event,
-            int partitionCount) {
-        return partition(streamName, new ArrayList<List<String>>(0), event,
-                partitionCount);
-    }
-
-    /**
-     * A reference on the emitter allows getting the current partition id from the comm layer 
-     * @param emitter comm layer emitter
-     */
-    public void setEventEmitter(CommLayerEmitter emitter) {
-        this.emitter = emitter;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/Partitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/Partitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/Partitioner.java
deleted file mode 100644
index 38b6a4b..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/Partitioner.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.List;
-
-public interface Partitioner {
-    public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java
deleted file mode 100644
index 162cf46..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class RoundRobinPartitioner implements Partitioner {
-    private int counter = 0;
-    private Set<String> streamNameSet;
-
-    public void setStreamNames(String[] streamNames) {
-        streamNameSet = new HashSet<String>(streamNames.length);
-        for (String eventType : streamNames) {
-            streamNameSet.add(eventType);
-        }
-    }
-
-    @Override
-    public List<CompoundKeyInfo> partition(String streamName, Object event,
-            int partitionCount) {
-
-        if (streamName != null && streamNameSet != null
-                && !streamNameSet.contains(streamName)) {
-            return null;
-        }
-
-        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
-        int partitionId = 0;
-
-        synchronized (this) {
-            counter++;
-            if (counter < 0) {
-                counter = 0;
-            }
-            partitionId = counter % partitionCount;
-        }
-
-        partitionInfo.setPartitionId(partitionId);
-        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
-        partitionInfoList.add(partitionInfo);
-
-        return partitionInfoList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/TestDefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/TestDefaultPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/TestDefaultPartitioner.java
deleted file mode 100644
index f18d5bd..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/TestDefaultPartitioner.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestDefaultPartitioner {
-    public static void main(String[] args) {
-        DefaultPartitioner dp1 = new DefaultPartitioner();
-        DefaultPartitioner dp2 = new DefaultPartitioner();
-        dp1.setDebug(true);
-        dp1.setHashKey(new String[] { "list1/val1", "list1/val2", "query" });
-        dp1.setHasher(new DefaultHasher());
-
-        dp2.setDebug(true);
-        dp2.setHashKey(new String[] { "user" });
-        dp2.setHasher(new DefaultHasher());
-
-        TopLevel tl1 = new TopLevel();
-        tl1.setQuery("Hello there");
-        tl1.setUser("spitzer");
-
-        for (int i = 0; i < 4; i++) {
-            Nested n = new Nested();
-            n.setVal1(i + 77);
-            n.setVal2(i / 10.7);
-            tl1.addNested(n);
-        }
-
-        dp1.partition("test", tl1, 4);
-        dp2.partition("test", tl1, 4);
-
-    }
-
-    static class TopLevel {
-        private String query;
-        private List<Nested> list1 = new ArrayList<Nested>();
-        private String user;
-
-        public String getQuery() {
-            return query;
-        }
-
-        public void setQuery(String query) {
-            this.query = query;
-        }
-
-        public List<Nested> getList1() {
-            return list1;
-        }
-
-        public void setList1(List<Nested> list1) {
-            this.list1 = list1;
-        }
-
-        public String getUser() {
-            return user;
-        }
-
-        public void setUser(String user) {
-            this.user = user;
-        }
-
-        public void addNested(Nested nested) {
-            list1.add(nested);
-        }
-    }
-
-    static class Nested {
-        long val1;
-        double val2;
-
-        public Nested() {
-
-        }
-
-        public long getVal1() {
-            return val1;
-        }
-
-        public void setVal1(long val1) {
-            this.val1 = val1;
-        }
-
-        public double getVal2() {
-            return val2;
-        }
-
-        public void setVal2(double val2) {
-            this.val2 = val2;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/dispatcher/partitioner/VariableKeyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/VariableKeyPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/VariableKeyPartitioner.java
deleted file mode 100644
index 8746bc4..0000000
--- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/VariableKeyPartitioner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.dispatcher.partitioner;
-
-import java.util.List;
-
-public interface VariableKeyPartitioner {
-    public List<CompoundKeyInfo> partition(String streamName,
-                                           List<List<String>> keyNames,
-                                           Object event, int partitionCount);
-}


Mime
View raw message