helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [23/47] Refactoring from com.linkedin.helix to org.apache.helix
Date Wed, 24 Oct 2012 22:26:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java b/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
deleted file mode 100644
index 0803877..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/ZNRecordJsonSerializer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-
-public class ZNRecordJsonSerializer implements PropertySerializer<ZNRecord>
-{
-  static private Logger LOG = Logger.getLogger(ZNRecordJsonSerializer.class);
-  private final ZNRecordSerializer _serializer = new ZNRecordSerializer();
-  
-  @Override
-  public byte[] serialize(ZNRecord data) throws PropertyStoreException
-  {
-    return _serializer.serialize(data);
-  }
-
-  @Override
-  public ZNRecord deserialize(byte[] bytes) throws PropertyStoreException
-  {
-    return (ZNRecord) _serializer.deserialize(bytes);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
deleted file mode 100644
index 7b3c959..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/FileHelixPropertyStore.java
+++ /dev/null
@@ -1,293 +0,0 @@
-package com.linkedin.helix.store.file;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.store.HelixPropertyListener;
-import com.linkedin.helix.store.HelixPropertyStore;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStoreException;
-
-public class FileHelixPropertyStore<T> implements HelixPropertyStore<T>
-{
-  final FilePropertyStore<T> _store;
-
-  public FileHelixPropertyStore(final PropertySerializer<T> serializer,
-                                String rootNamespace,
-                                final PropertyJsonComparator<T> comparator)
-  {
-    _store = new FilePropertyStore<T>(serializer, rootNamespace, comparator);
-  }
-
-  @Override
-  public boolean create(String path, T record, int options)
-  {
-    return set(path, record, options);
-  }
-
-  @Override
-  public boolean set(String path, T record, int options)
-  {
-    try
-    {
-      _store.setProperty(path, record);
-      return true;
-    }
-    catch (PropertyStoreException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return false;
-  }
-
-  @Override
-  public boolean update(String path, DataUpdater<T> updater, int options)
-  {
-    _store.updatePropertyUntilSucceed(path, updater);
-    return true;
-  }
-
-  @Override
-  public boolean remove(String path, int options)
-  {
-    try
-    {
-      _store.removeProperty(path);
-      return true;
-    }
-    catch (PropertyStoreException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return false;
-  }
-
-  @Override
-  public boolean[] createChildren(List<String> paths, List<T> records, int options)
-  {
-    return setChildren(paths, records, options);
-  }
-
-  @Override
-  public boolean[] setChildren(List<String> paths, List<T> records, int options)
-  {
-    boolean[] success = new boolean[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      success[i] = create(paths.get(i), records.get(i), options);
-    }
-    return success;
-  }
-
-  @Override
-  public boolean[] updateChildren(List<String> paths,
-                                  List<DataUpdater<T>> updaters,
-                                  int options)
-  {
-    boolean[] success = new boolean[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      success[i] = update(paths.get(i), updaters.get(i), options);
-    }
-    return success;
-
-  }
-
-  @Override
-  public boolean[] remove(List<String> paths, int options)
-  {
-    boolean[] success = new boolean[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      success[i] = remove(paths.get(i), options);
-    }
-    return success;
-  }
-
-  @Override
-  public T get(String path, Stat stat, int options)
-  {
-    PropertyStat propertyStat = new PropertyStat();
-    try
-    {
-      T value = _store.getProperty(path, propertyStat);
-      if (stat != null)
-      {
-        stat.setVersion(propertyStat.getVersion());
-        stat.setMtime(propertyStat.getLastModifiedTime());
-      }
-      return value;
-    }
-    catch (PropertyStoreException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return null;
-  }
-
-  @Override
-  public List<T> get(List<String> paths, List<Stat> stats, int options)
-  {
-    List<T> values = new ArrayList<T>();
-    for (int i = 0; i < paths.size(); i++)
-    {
-      values.add(get(paths.get(i), stats.get(i), options));
-    }
-    return values;
-  }
-
-  @Override
-  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
-  {
-    List<String> childNames = getChildNames(parentPath, options);
-    
-    if (childNames == null)
-    {
-      return null;
-    }
-
-    List<String> paths = new ArrayList<String>();
-    for (String childName : childNames)
-    {
-      String path = parentPath + "/" + childName;
-      paths.add(path);
-    }
-    
-    return get(paths, stats, options);
-  }
-
-  @Override
-  public List<String> getChildNames(String parentPath, int options)
-  {
-    try
-    {
-      return _store.getPropertyNames(parentPath);
-    }
-    catch (PropertyStoreException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return Collections.emptyList();
-  }
-
-  @Override
-  public boolean exists(String path, int options)
-  {
-    return _store.exists(path);
-  }
-
-  @Override
-  public boolean[] exists(List<String> paths, int options)
-  {
-    boolean[] exists = new boolean[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      exists[i] = exists(paths.get(i), options);
-    }
-    return exists;
-  }
-
-  @Override
-  public Stat[] getStats(List<String> paths, int options)
-  {
-    Stat[] stats = new Stat[paths.size()];
-    for (int i = 0; i < paths.size(); i++)
-    {
-      stats[i] = getStat(paths.get(i), options);
-    }
-    return stats;
-  }
-
-  @Override
-  public Stat getStat(String path, int options)
-  {
-    Stat stat = new Stat();
-    get(path, stat, options);
-    return stat;
-  }
-
-  @Override
-  public void subscribeDataChanges(String path, IZkDataListener listener)
-  {
-    throw new UnsupportedOperationException("subscribeDataChanges not supported");
-  }
-
-  @Override
-  public void unsubscribeDataChanges(String path, IZkDataListener listener)
-  {
-    throw new UnsupportedOperationException("unsubscribeDataChanges not supported");
-  }
-
-  @Override
-  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
-  {
-    throw new UnsupportedOperationException("subscribeChildChanges not supported");
-  }
-
-  @Override
-  public void unsubscribeChildChanges(String path, IZkChildListener listener)
-  {
-    throw new UnsupportedOperationException("unsubscribeChildChanges not supported");
-  }
-
-  @Override
-  public void start()
-  {
-    _store.start();
-  }
-
-  @Override
-  public void stop()
-  {
-    _store.stop();
-  }
-
-  @Override
-  public void subscribe(String parentPath, final HelixPropertyListener listener)
-  {
-    try
-    {
-      _store.subscribeForPropertyChange(parentPath, new PropertyChangeListener<T>()
-      {
-
-        @Override
-        public void onPropertyChange(String key)
-        {
-          listener.onDataChange(key);
-        }
-      });
-    }
-    catch (PropertyStoreException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void unsubscribe(String parentPath, HelixPropertyListener listener)
-  {
-    throw new UnsupportedOperationException("unsubscribe not supported");
-  }
-
-  @Override
-  public void reset()
-  {
-    // TODO Auto-generated method stub
-    
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
deleted file mode 100644
index 0ee1a29..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/FilePropertyStore.java
+++ /dev/null
@@ -1,942 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.file;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.io.DirectoryWalker;
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.manager.file.FileCallbackHandler;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-
-/**
- *
- * property store that built upon a file system
- * since file systems usually have sophisticated cache mechanisms
- * there is no need for another cache for file property store
- *
- * NOTES:
- * lastModified timestamp provided by java file io has only second level precision
- * so it is possible that files have been modified without changing its lastModified timestamp
- * the solution is to use a map that caches the files changed in last second
- * and in the next round of refresh, check against this map to figure out whether a file
- * has been changed/created in the last second
- * @link http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6939260
- *
- * @author zzhang
- * @param <T>
- */
-
-public class FilePropertyStore<T> implements PropertyStore<T>
-{
-  private static Logger logger = Logger.getLogger(FilePropertyStore.class);
-
-  private final String ROOT = "/";
-  private final long TIMEOUT = 30L;
-  private final long REFRESH_PERIOD = 1000; // ms
-  private final int _id = new Random().nextInt();
-  private final String _rootNamespace;
-  private PropertySerializer<T> _serializer;
-  private final PropertyJsonComparator<T> _comparator;
-
-  private Thread _refreshThread;
-  private final AtomicBoolean _stopRefreshThread;
-  private final CountDownLatch _firstRefreshCounter;
-  private final ReadWriteLock _readWriteLock;
-
-  private Map<String, T> _lastModifiedFiles = new HashMap<String, T>();
-  private Map<String, T> _curModifiedFiles = new HashMap<String, T>();
-
-  private final Map< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > _fileChangeListeners; // map key to listener
-
-  private class FilePropertyStoreRefreshThread implements Runnable
-  {
-    private final PropertyStoreDirWalker _dirWalker;
-
-    public class PropertyStoreDirWalker extends DirectoryWalker
-    {
-      private final File _propertyStoreRootDir;
-      private long _lastNotifiedTime = 0;
-      private long _currentHighWatermark;
-
-      public PropertyStoreDirWalker(String rootNamespace, ReadWriteLock readWriteLock)
-      {
-        _propertyStoreRootDir = new File(rootNamespace);
-      }
-
-      @SuppressWarnings({ "rawtypes", "unchecked" })
-      @Override
-      protected void handleFile(File file, int depth, Collection results) throws IOException
-      {
-        if (file.lastModified() < _lastNotifiedTime)
-        {
-          return;
-        }
-
-        String path = getRelativePath(file.getAbsolutePath());
-        T newValue = null;
-        try
-        {
-          newValue = getProperty(path);
-        } catch (PropertyStoreException e)
-        {
-          logger.error("fail to get property, path:" + path, e);
-        }
-
-        if (file.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
-        {
-          T value = _lastModifiedFiles.get(path);
-
-          if (_comparator.compare(value, newValue) == 0)
-          {
-            if (file.lastModified() == _currentHighWatermark)
-            {
-              _curModifiedFiles.put(path, newValue);
-            }
-            return;
-          }
-        }
-
-        if (file.lastModified() > _currentHighWatermark)
-        {
-          _currentHighWatermark = file.lastModified();
-
-          _curModifiedFiles.clear();
-          _curModifiedFiles.put(path, newValue);
-        }
-        else if (file.lastModified() == _currentHighWatermark)
-        {
-          _curModifiedFiles.put(path, newValue);
-        }
-
-        // debug
-//        logger.error("file: " + file.getAbsolutePath() + " changed@" + file.lastModified() + " (" +
-//            new Date(file.lastModified()) + ")");
-        results.add(file);
-      }
-
-      @Override
-      protected boolean handleDirectory(File dir, int depth, Collection results) throws IOException
-      {
-        if (dir.lastModified() < _lastNotifiedTime)
-        {
-          return true;
-        }
-
-        String path = getRelativePath(dir.getAbsolutePath());
-        T newValue = null;
-        try
-        {
-          newValue = getProperty(path);
-        }
-        catch (PropertyStoreException e)
-        {
-          logger.error("fail to get property, path:" + path, e);
-        }
-
-        if (dir.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
-        {
-          T value = _lastModifiedFiles.get(path);
-          if (_comparator.compare(value, newValue) == 0)
-          {
-            if (dir.lastModified() == _currentHighWatermark)
-            {
-              _curModifiedFiles.put(path, newValue);
-            }
-            return true;
-          }
-        }
-        _curModifiedFiles.put(path, newValue);
-
-        if (dir.lastModified() > _currentHighWatermark)
-        {
-          _currentHighWatermark = dir.lastModified();
-
-          _curModifiedFiles.clear();
-          _curModifiedFiles.put(path, newValue);
-        }
-        else if (dir.lastModified() == _currentHighWatermark)
-        {
-          _curModifiedFiles.put(path, newValue);
-        }
-
-        logger.debug("dir: " + dir.getAbsolutePath() + " changed@" + dir.lastModified() +
-            " (" + new Date(dir.lastModified()) + ")");
-        results.add(dir);
-
-        return true;
-      }
-
-      public void walk()
-      {
-        HashSet<File> files = new HashSet<File>();
-
-        try
-        {
-          _currentHighWatermark = _lastNotifiedTime;
-          _readWriteLock.readLock().lock();
-          super.walk(_propertyStoreRootDir, files);
-        }
-        catch (IOException e)
-        {
-          logger.error("IO exception when walking through dir:" + _propertyStoreRootDir, e);
-        }
-        finally
-        {
-          _lastNotifiedTime = _currentHighWatermark;
-          _lastModifiedFiles.clear();
-
-          Map<String, T> temp = _lastModifiedFiles;
-          _lastModifiedFiles = _curModifiedFiles;
-          _curModifiedFiles = temp;
-          _readWriteLock.readLock().unlock();
-        }
-
-        // TODO see if we can use DirectoryFileComparator.DIRECTORY_COMPARATOR.sort()
-        File[] fileArray = new File[files.size()];
-        fileArray = files.toArray(fileArray);
-        Arrays.sort(fileArray, new Comparator<File>() {
-
-          @Override
-          public int compare(File file1, File file2)
-          {
-            return file1.getAbsoluteFile().compareTo(file2.getAbsoluteFile());
-          }
-
-        });
-
-
-        // notify listeners
-        for (int i = 0; i < fileArray.length; i++)
-        {
-          File file = fileArray[i];
-
-          // debug
-//          logger.error("Before send notification of " + file.getAbsolutePath() + " to listeners " + _fileChangeListeners);
-
-          for (Map.Entry< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > entry : _fileChangeListeners.entrySet())
-          {
-            String absPath = file.getAbsolutePath();
-            if (absPath.startsWith(entry.getKey()))
-            {
-              for (PropertyChangeListener<T> listener : entry.getValue())
-              {
-                if (listener instanceof FileCallbackHandler)
-                {
-                  FileCallbackHandler handler = (FileCallbackHandler) listener;
-
-                  // debug
-//                  logger.error("Send notification of " + file.getAbsolutePath() + " to listener:" + handler.getListener());
-                }
-                listener.onPropertyChange(getRelativePath(absPath));
-              }
-            }
-          }
-        }
-      }
-    }
-
-    public FilePropertyStoreRefreshThread(ReadWriteLock readWriteLock)
-    {
-      _dirWalker = new PropertyStoreDirWalker(_rootNamespace, readWriteLock);
-    }
-
-    @Override
-    public void run()
-    {
-      while (!_stopRefreshThread.get())
-      {
-        _dirWalker.walk();
-        _firstRefreshCounter.countDown();
-
-        try
-        {
-          Thread.sleep(REFRESH_PERIOD);
-//          System.out.println("refresh thread is running");
-        }
-        catch (InterruptedException ie)
-        {
-          // do nothing
-        }
-      }
-
-      logger.info("Quitting file property store refresh thread");
-
-    }
-
-  }
-
-//  public FilePropertyStore(final PropertySerializer<T> serializer)
-//  {
-//    this(serializer, System.getProperty("java.io.tmpdir"));
-//  }
-
-  public FilePropertyStore(final PropertySerializer<T> serializer, String rootNamespace,
-      final PropertyJsonComparator<T> comparator)
-  {
-    _serializer = serializer;
-    _comparator = comparator;
-    _stopRefreshThread = new AtomicBoolean(false);
-    _firstRefreshCounter = new CountDownLatch(1);
-    _readWriteLock = new ReentrantReadWriteLock();
-
-    _fileChangeListeners = new ConcurrentHashMap< String, CopyOnWriteArraySet<PropertyChangeListener<T> > >();
-
-    // Strip off leading slash
-    while (rootNamespace.startsWith("/"))
-    {
-      // rootNamespace = rootNamespace.substring(1, rootNamespace.length());
-      rootNamespace = rootNamespace.substring(1);
-    }
-    _rootNamespace = "/" + rootNamespace;
-
-    this.createRootNamespace();
-  }
-
-
-  @Override
-  public boolean start()
-  {
-    // check if start has already been invoked
-    if (_firstRefreshCounter.getCount() == 0)
-      return true;
-
-    logger.debug("starting file property store polling thread, id=" + _id);
-
-    _stopRefreshThread.set(false);
-    _refreshThread = new Thread(new FilePropertyStoreRefreshThread(_readWriteLock),
-                                "FileRefreshThread_" + _id);
-    // _refreshThread.setDaemon(true);
-    _refreshThread.start();
-
-    try
-    {
-      boolean timeout = !_firstRefreshCounter.await(TIMEOUT, TimeUnit.SECONDS);
-      if (timeout)
-      {
-        throw new Exception("Timeout while waiting for the first refresh to complete");
-      }
-    }
-    catch (InterruptedException e)
-    {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    catch (Exception e)
-    {
-      e.printStackTrace();
-    }
-
-    return true;
-  }
-
-  @Override
-  public boolean stop()
-  {
-    if (_stopRefreshThread.compareAndSet(false, true))
-    {
-      // _stopRefreshThread.set(true);
-      if (_refreshThread != null)
-      {
-        try
-        {
-          _refreshThread.join();
-        }
-        catch (InterruptedException e)
-        {
-        }
-      }
-    }
-    return true;
-  }
-
-  private String getPath(String key)
-  {
-    // Strip off leading slash
-    while (key.startsWith("/"))
-    {
-      // key = key.substring(1, key.length());
-      key = key.substring(1);
-    }
-
-    // String path = key.equals(ROOT) ? _rootNamespace : (_rootNamespace + "/" + key);
-    String path = key.equals("") ? _rootNamespace : (_rootNamespace + "/" + key);
-    return path;
-  }
-
-  private String getRelativePath(String path)
-  {
-    // strip off rootPath from path
-    if (!path.startsWith(_rootNamespace))
-    {
-      logger.warn("path does NOT start with: " + _rootNamespace);
-      return path;
-    }
-
-    if (path.equals(_rootNamespace))
-      return ROOT;
-
-    // path = path.substring(_rootNamespace.length() + 1);
-    path = path.substring(_rootNamespace.length());
-
-    return path;
-  }
-
-  public void createRootNamespace()
-  {
-    createPropertyNamespace(ROOT);
-  }
-
-  @Override
-  public void createPropertyNamespace(String prefix)
-  {
-    String path = getPath(prefix);
-    File dir = new File(path);
-    try
-    {
-      _readWriteLock.writeLock().lock();
-      if (dir.exists())
-      {
-        logger.warn(path + " already exists");
-      }
-      else
-      {
-        if (!dir.mkdirs())
-        {
-          logger.warn("Failed to create: " + path);
-        }
-      }
-    }
-    catch (Exception e)
-    {
-      logger.error("Failed to create dir: " + path + "\nexception:" + e);
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-    }
-
-  }
-
-  @Override
-  public void setProperty(String key, T value) throws PropertyStoreException
-  {
-    String path = getPath(key);
-    File file = new File(path);  // null;
-    // FileLock fLock = null;
-    FileOutputStream fout = null;
-
-    // TODO create non-exist dirs recursively
-    try
-    {
-      _readWriteLock.writeLock().lock();
-      // file = new File(path);
-      if (!file.exists())
-      {
-        FileUtils.touch(file);
-      }
-
-      fout = new FileOutputStream(file);
-      // FileChannel fChannel = fout.getChannel();
-
-      // TODO need a timeout on lock operation
-      // fLock = fChannel.lock();
-
-      byte[] bytes = _serializer.serialize(value);
-      fout.write(bytes);
-    }
-//    catch (FileNotFoundException e)
-//    {
-//      logger.error("fail to set property, key:" + key +
-//          "\nfile not found exception:" + e);
-//    }
-    catch (IOException e)
-    {
-      logger.error("fail to set property. key:" + key +
-          "value:" + value, e);
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-      try
-      {
-        // if (fLock != null && fLock.isValid())
-        //   fLock.release();
-
-        if (fout != null)
-        {
-          fout.close();
-        }
-      }
-      catch (IOException e)
-      {
-        logger.error("fail to close file. key:" + key, e);
-      }
-
-    }
-
-  }
-
-  @Override
-  public T getProperty(String key) throws PropertyStoreException
-  {
-    return this.getProperty(key, null);
-  }
-
-  @Override
-  public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
-  {
-
-    String path = getPath(key);
-    File file = null;
-    // FileLock fLock = null;
-    FileInputStream fin = null;
-
-    try
-    {
-      // TODO need a timeout on lock operation
-      _readWriteLock.readLock().lock();
-
-      file = new File(path);
-      if (!file.exists())
-      {
-        return null;
-      }
-
-      fin = new FileInputStream(file);
-      // FileChannel fChannel = fin.getChannel();
-      // fLock = fChannel.lock(0L, Long.MAX_VALUE, true);
-
-      int availableBytes = fin.available();
-      if (availableBytes == 0)
-      {
-        return null;
-      }
-
-      byte[] bytes = new byte[availableBytes];
-      fin.read(bytes);
-
-      if (propertyStat != null)
-      {
-        propertyStat.setLastModifiedTime(file.lastModified());
-      }
-
-      return _serializer.deserialize(bytes);
-    }
-    catch (FileNotFoundException e)
-    {
-      return null;
-    }
-    catch (IOException e)
-    {
-      logger.error("fail to get property. key:" + key, e);
-    }
-    finally
-    {
-      _readWriteLock.readLock().unlock();
-      try
-      {
-        // if (fLock != null && fLock.isValid())
-        //   fLock.release();
-        if (fin != null)
-        {
-          fin.close();
-        }
-      }
-      catch (IOException e)
-      {
-        logger.error("fail to close file. key:" + key, e);
-      }
-
-    }
-
-    return null;
-  }
-
-  @Override
-  public void removeProperty(String key) throws PropertyStoreException
-  {
-    String path = getPath(key);
-    File file = new File(path);
-
-    try
-    {
-      _readWriteLock.writeLock().lock();
-      if (!file.exists())
-      {
-        return;
-      }
-
-      boolean success = file.delete();
-      if (!success)
-      {
-        logger.error("fail to remove file. path:" + path);
-      }
-    }
-    catch (Exception e)
-    {
-      logger.error("fail to remove file. path:" + path, e);
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-    }
-
-
-  }
-
-  public void removeRootNamespace() throws PropertyStoreException
-  {
-    removeNamespace(ROOT);
-  }
-
-  @Override
-  public void removeNamespace(String prefix) throws PropertyStoreException
-  {
-    String path = getPath(prefix);
-
-    try
-    {
-      _readWriteLock.writeLock().lock();
-      FileUtils.deleteDirectory(new File(path));
-    }
-    catch (IOException e)
-    {
-      logger.error("fail to remove namespace, path:" + path, e);
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-    }
-  }
-
-  private void doGetPropertyNames(String path, List<String> leafNodes)
-  throws PropertyStoreException
-  {
-    File file = new File(path);
-    if (!file.exists())
-    {
-      return;
-    }
-
-    // List<String> childs = _zkClient.getChildren(path);
-    if (file.isDirectory())
-    {
-      String[] childs = file.list();
-      if (childs == null || childs.length == 0)
-      {
-        return;
-      }
-      for (String child : childs)
-      {
-        String pathToChild = path + "/" + child;
-        doGetPropertyNames(pathToChild, leafNodes);
-      }
-    }
-    else if (file.isFile())
-    {
-      // getProperty(getRelativePath(path));
-      leafNodes.add(getRelativePath(path));
-      return;
-    }
-  }
-
-
-  @Override
-  public List<String> getPropertyNames(String prefix) throws PropertyStoreException
-  {
-    String path = getPath(prefix);
-    List<String> propertyNames = new ArrayList<String>();
-
-    try
-    {
-      _readWriteLock.readLock().lock();
-      doGetPropertyNames(path, propertyNames);
-    }
-    finally
-    {
-      _readWriteLock.readLock().unlock();
-    }
-
-    // sort it to get deterministic order
-    Collections.sort(propertyNames);
-
-    return propertyNames;
-  }
-
-  @Override
-  public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
-  {
-    throw new UnsupportedOperationException(
-        "setPropertyDelimiter() is NOT supported by FilePropertyStore");
-  }
-
-  @Override
-  public void subscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
-  throws PropertyStoreException
-  {
-    if (null != listener)
-    {
-      String path = getPath(prefix);
-      synchronized (_fileChangeListeners)
-      {
-        CopyOnWriteArraySet<PropertyChangeListener <T> > listeners = _fileChangeListeners.get(path);
-        if (listeners == null) {
-            listeners = new CopyOnWriteArraySet<PropertyChangeListener <T> >();
-            _fileChangeListeners.put(path, listeners);
-        }
-        listeners.add(listener);
-      }
-    }
-
-  }
-
-  @Override
-  public void unsubscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
-  throws PropertyStoreException
-  {
-    if (null != listener)
-    {
-      String path = getPath(prefix);
-      synchronized (_fileChangeListeners)
-      {
-        final Set<PropertyChangeListener<T> > listeners = _fileChangeListeners.get(path);
-        if (listeners != null)
-        {
-            listeners.remove(listener);
-        }
-        if (listeners == null || listeners.isEmpty())
-        {
-            _fileChangeListeners.remove(path);
-        }
-      }
-    }
-
-  }
-
-  @Override
-  public boolean canParentStoreData()
-  {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  @Override
-  public String getPropertyRootNamespace()
-  {
-    return _rootNamespace;
-  }
-
-  @Override
-  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater)
-  {
-    updatePropertyUntilSucceed(key, updater, true);
-  }
-
-  @Override
-  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater,
-      boolean createIfAbsent)
-  {
-    String path = getPath(key);
-    File file = new File(path);
-    RandomAccessFile raFile = null;
-    FileLock fLock = null;
-
-    try
-    {
-      _readWriteLock.writeLock().lock();
-      if (!file.exists())
-      {
-        FileUtils.touch(file);
-      }
-
-      raFile = new RandomAccessFile(file, "rw");
-      FileChannel fChannel = raFile.getChannel();
-      fLock = fChannel.lock();
-
-      T current = getProperty(key);
-      T update = updater.update(current);
-      setProperty(key, update);
-    }
-    catch (Exception e)
-    {
-      logger.error("fail to updatePropertyUntilSucceed, path:" + path, e);
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-      try
-      {
-        if (fLock != null && fLock.isValid())
-        {
-           fLock.release();
-        }
-
-        if (raFile != null)
-        {
-          raFile.close();
-        }
-      }
-      catch (IOException e)
-      {
-        logger.error("fail to close file, path:" + path, e);
-      }
-    }
-  }
-
-
-  @Override
-  public void setPropertySerializer(PropertySerializer<T> serializer)
-  {
-    _readWriteLock.writeLock().lock();
-    _serializer = serializer;
-    _readWriteLock.writeLock().unlock();
-  }
-
-  @Override
-  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
-  {
-    return compareAndSet(key, expected, update, comparator, false);
-  }
-
-  @Override
-  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator,
-                               boolean createIfAbsent)
-  {
-    String path = getPath(key);
-    File file = new File(path);
-//    FileInputStream fin = null;
-//    FileOutputStream fout = null;
-    RandomAccessFile raFile = null;
-    FileLock fLock = null;
-
-    try
-    {
-      _readWriteLock.writeLock().lock();
-
-      if (createIfAbsent)
-      {
-        file.createNewFile();
-      }
-
-//      fin = new FileInputStream(file);
-//      FileChannel fChannel = fin.getChannel();
-      raFile = new RandomAccessFile(file, "rw");
-      FileChannel fChannel = raFile.getChannel();
-      fLock = fChannel.lock();
-
-      T current = getProperty(key);
-      if (comparator.compare(current, expected) == 0)
-      {
-//        fout = new FileOutputStream(file);
-//
-//        byte[] bytes = _serializer.serialize(update);
-//        fout.write(bytes);
-        setProperty(key, update);
-        return true;
-      }
-
-      return false;
-    }
-    catch (FileNotFoundException e)
-    {
-      logger.error("fail to compareAndSet. path:" + path, e);
-      return false;
-    }
-    catch (Exception e)
-    {
-      logger.error("fail to compareAndSet. path:" + path, e);
-      return false;
-    }
-    finally
-    {
-      _readWriteLock.writeLock().unlock();
-      try
-      {
-        if (fLock != null && fLock.isValid())
-        {
-           fLock.release();
-        }
-
-        if (raFile != null)
-        {
-          raFile.close();
-        }
-
-//        if (fin != null)
-//        {
-//          fin.close();
-//        }
-//
-//        if (fout != null)
-//        {
-//          fout.close();
-//        }
-      }
-      catch (IOException e)
-      {
-        logger.error("fail to close file. path:" + path, e);
-      }
-    }
-
-  }
-
-  @Override
-  public boolean exists(String key)
-  {
-    String path = getPath(key);
-    File file = new File(path);
-    _readWriteLock.readLock().lock();
-
-    boolean ret = file.exists();
-    _readWriteLock.readLock().unlock();
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
deleted file mode 100644
index 1dab611..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/file/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix implementation of file-based property store (Deprecated)
- * 
- */
-package com.linkedin.helix.store.file;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
deleted file mode 100644
index 4417993..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix application property store classes
- * 
- */
-package com.linkedin.helix.store;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
deleted file mode 100644
index f89bfc6..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/PropertyItem.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import org.apache.zookeeper.data.Stat;
-
-public class PropertyItem
-{
-  byte[] _value;
-  Stat _stat;
-  
-  public PropertyItem(byte[] value, Stat stat)
-  {
-    _value = value;
-    _stat = stat;
-  }
-  
-  public byte[] getBytes()
-  {
-    return _value;
-  }
-  
-  public int getVersion()
-  {
-    return _stat.getVersion();
-  }
-  
-  public long getLastModifiedTime()
-  {
-    return _stat.getMtime();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
deleted file mode 100644
index 4cfc54f..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZKPropertyStore.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.store.PropertyChangeListener;
-import com.linkedin.helix.store.PropertySerializer;
-import com.linkedin.helix.store.PropertyStat;
-import com.linkedin.helix.store.PropertyStore;
-import com.linkedin.helix.store.PropertyStoreException;
-
-@Deprecated
-public class ZKPropertyStore<T> implements
-    PropertyStore<T>,
-    IZkStateListener,
-    IZkDataListener // , IZkChildListener,
-{
-  private static Logger LOG = Logger.getLogger(ZKPropertyStore.class);
-
-  class ByteArrayUpdater implements DataUpdater<byte[]>
-  {
-    final DataUpdater<T> _updater;
-    final PropertySerializer<T> _serializer;
-
-    ByteArrayUpdater(DataUpdater<T> updater, PropertySerializer<T> serializer)
-    {
-      _updater = updater;
-      _serializer = serializer;
-    }
-
-    @Override
-    public byte[] update(byte[] current)
-    {
-      try
-      {
-        T currentValue = null;
-        if (current != null)
-        {
-          currentValue = _serializer.deserialize(current);
-        }
-        T updateValue = _updater.update(currentValue);
-        return _serializer.serialize(updateValue);
-      }
-      catch (PropertyStoreException e)
-      {
-        LOG.error("Exception in update. Updater: " + _updater, e);
-      }
-      return null;
-    }
-  }
-
-  private volatile boolean _isConnected = false;
-  private volatile boolean _hasSessionExpired = false;
-
-  protected final ZkClient _zkClient;
-  protected PropertySerializer<T> _serializer;
-  protected final String _root;
-
-  // zookeeperPath->userCallbak->zkCallback
-  private final Map<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>> _callbackMap =
-      new HashMap<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>>();
-
-  // TODO cache capacity should be bounded
-  private final Map<String, PropertyItem> _cache =
-      new ConcurrentHashMap<String, PropertyItem>();
-
-  /**
-   * The given zkClient is assumed to serialize and deserialize raw byte[]
-   * for the given root and its descendants.
-   */
-  public ZKPropertyStore(ZkClient zkClient, final PropertySerializer<T> serializer,
-                         String root)
-  {
-    if (zkClient == null || serializer == null || root == null)
-    {
-      throw new IllegalArgumentException("zkClient|serializer|root can't be null");
-    }
-
-    _root = normalizeKey(root);
-    _zkClient = zkClient;
-
-    setPropertySerializer(serializer);
-
-    _zkClient.createPersistent(_root, true);
-    _zkClient.subscribeStateChanges(this);
-  }
-
-  // key is normalized if it has exactly 1 leading slash
-  private String normalizeKey(String key)
-  {
-    if (key == null)
-    {
-      LOG.error("Key can't be null");
-      throw new IllegalArgumentException("Key can't be null");
-    }
-
-    // strip off leading slash
-    while (key.startsWith("/"))
-    {
-      key = key.substring(1);
-    }
-
-    return "/" + key;
-  }
-
-  private String getAbsolutePath(String key)
-  {
-    key = normalizeKey(key);
-    if (key.equals("/"))
-    {
-      return _root;
-    }
-    else
-    {
-      return _root + key;
-    }
-  }
-
-  // always a return normalized key
-  String getRelativePath(String path)
-  {
-    if (!path.startsWith(_root))
-    {
-      String errMsg = path + "does NOT start with property store's root: " + _root;
-      LOG.error(errMsg);
-      throw new IllegalArgumentException(errMsg);
-    }
-
-    if (path.equals(_root))
-    {
-      return "/";
-    }
-    else
-    {
-      return path.substring(_root.length());
-    }
-  }
-
-  @Override
-  public void createPropertyNamespace(String prefix) throws PropertyStoreException
-  {
-    String path = getAbsolutePath(prefix);
-    try
-    {
-      if (!_zkClient.exists(path))
-      {
-        _zkClient.createPersistent(path, true);
-      }
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in creatPropertyNamespace(" + prefix + ")", e);
-      throw new PropertyStoreException(e.toString());
-    }
-  }
-
-  @Override
-  public void setProperty(String key, final T value) throws PropertyStoreException
-  {
-    String path = getAbsolutePath(key);
-
-    try
-    {
-      if (!_zkClient.exists(path))
-      {
-        _zkClient.createPersistent(path, true);
-      }
-
-      // serializer should handle value == null
-      byte[] valueBytes = _serializer.serialize(value);
-      _zkClient.writeData(path, valueBytes);
-
-      // update cache
-      // getProperty(key);
-
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception when setProperty(" + key + ", " + value + ")", e);
-      throw new PropertyStoreException(e.toString());
-    }
-  }
-
-  @Override
-  public T getProperty(String key) throws PropertyStoreException
-  {
-    return getProperty(key, null);
-  }
-
-  // bytes and stat are not null
-  private T getValueAndStat(byte[] bytes, Stat stat, PropertyStat propertyStat) throws PropertyStoreException
-  {
-    T value = _serializer.deserialize(bytes);
-
-    if (propertyStat != null)
-    {
-      propertyStat.setLastModifiedTime(stat.getMtime());
-      propertyStat.setVersion(stat.getVersion());
-    }
-    return value;
-  }
-
-  @Override
-  public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
-  {
-    String normalizedKey = normalizeKey(key);
-    String path = getAbsolutePath(normalizedKey);
-    Stat stat = new Stat();
-
-    T value = null;
-    try
-    {
-      synchronized (_cache)
-      {
-        PropertyItem item = _cache.get(normalizedKey);
-        _zkClient.subscribeDataChanges(path, this);
-        if (item != null)
-        {
-          // cache hit
-          stat = _zkClient.getStat(path);
-          if (stat != null)
-          {
-            if (item._stat.getCzxid() != stat.getCzxid()
-                || item.getVersion() < stat.getVersion())
-            {
-              // stale data in cache
-              byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
-              if (bytes != null)
-              {
-                value = getValueAndStat(bytes, stat, propertyStat);
-                _cache.put(normalizedKey, new PropertyItem(bytes, stat));
-              }
-            }
-            else
-            {
-              // valid data in cache
-              // item.getBytes() should not be null
-              value = getValueAndStat(item.getBytes(), stat, propertyStat);
-            }
-          }
-        }
-        else
-        {
-          // cache miss
-          byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
-          if (bytes != null)
-          {
-            value = getValueAndStat(bytes, stat, propertyStat);
-            _cache.put(normalizedKey, new PropertyItem(bytes, stat));
-          }
-        }
-      }
-      return value;
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in getProperty(" + key + ")", e);
-      throw (new PropertyStoreException(e.toString()));
-    }
-  }
-
-  @Override
-  public void removeProperty(String key) throws PropertyStoreException
-  {
-    String normalizedKey = normalizeKey(key);
-    String path = getAbsolutePath(normalizedKey);
-
-    try
-    {
-      // if (_zkClient.exists(path))
-      // {
-      _zkClient.delete(path);
-      // }
-      // _cache.remove(normalizedKey);
-
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in removeProperty(" + key + ")", e);
-      throw (new PropertyStoreException(e.toString()));
-    }
-  }
-
-  @Override
-  public String getPropertyRootNamespace()
-  {
-    return _root;
-  }
-
-  @Override
-  public void removeNamespace(String prefix) throws PropertyStoreException
-  {
-    String path = getAbsolutePath(prefix);
-
-    try
-    {
-      // if (_zkClient.exists(path))
-      // {
-      _zkClient.deleteRecursive(path);
-      // }
-
-      // update cache
-      // childs are all normalized keys
-      // List<String> childs = getPropertyNames(prefix);
-      // for (String child : childs)
-      // {
-      // _cache.remove(child);
-      // }
-    }
-    catch (ZkNoNodeException e)
-    {
-      // OK
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in removeProperty(" + prefix + ")", e);
-      throw (new PropertyStoreException(e.toString()));
-    }
-  }
-
-  // prefix is always normalized
-  private void doGetPropertyNames(String prefix, List<String> leafNodes) throws PropertyStoreException
-  {
-    String path = getAbsolutePath(prefix);
-
-    if (!_zkClient.exists(path))
-    {
-      return;
-    }
-
-    List<String> childs = _zkClient.getChildren(path);
-    if (childs == null)
-    {
-      return;
-    }
-
-    if (childs.size() == 0)
-    {
-      // add leaf node to cache
-      // getProperty(prefix);
-      leafNodes.add(prefix);
-      return;
-    }
-
-    for (String child : childs)
-    {
-      String childPath = prefix.equals("/") ? prefix + child : prefix + "/" + child;
-      doGetPropertyNames(childPath, leafNodes);
-    }
-  }
-
-  @Override
-  public List<String> getPropertyNames(String prefix) throws PropertyStoreException
-  {
-    String normalizedKey = normalizeKey(prefix);
-    List<String> propertyNames = new ArrayList<String>();
-    doGetPropertyNames(normalizedKey, propertyNames);
-
-    // sort it to get deterministic order
-    if (propertyNames.size() > 1)
-    {
-      Collections.sort(propertyNames);
-    }
-
-    return propertyNames;
-  }
-
-  @Override
-  public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
-  {
-    throw new PropertyStoreException("setPropertyDelimiter() not implemented for ZKPropertyStore");
-  }
-
-  // put data/child listeners on prefix and all childs
-  @Override
-  public void subscribeForPropertyChange(String prefix,
-                                         final PropertyChangeListener<T> listener) throws PropertyStoreException
-  {
-    if (listener == null)
-    {
-      throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
-    }
-
-    String path = getAbsolutePath(prefix);
-
-    ZkCallbackHandler<T> callback = null;
-    synchronized (_callbackMap)
-    {
-      Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks;
-      if (!_callbackMap.containsKey(path))
-      {
-        _callbackMap.put(path,
-                         new HashMap<PropertyChangeListener<T>, ZkCallbackHandler<T>>());
-      }
-      callbacks = _callbackMap.get(path);
-
-      if (!callbacks.containsKey(listener))
-      {
-        callback = new ZkCallbackHandler<T>(_zkClient, this, prefix, listener);
-        callbacks.put(listener, callback);
-      }
-    }
-
-    try
-    {
-      if (callback != null)
-      {
-        // a newly added callback
-        _zkClient.subscribeDataChanges(path, callback);
-        _zkClient.subscribeChildChanges(path, callback);
-
-        // do initial invocation
-        callback.handleChildChange(path, _zkClient.getChildren(path));
-
-        LOG.debug("Subscribed changes for " + path);
-      }
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in subscribeForPropertyChange(" + prefix + ")", e);
-      throw (new PropertyStoreException(e.toString()));
-    }
-  }
-
-  // prefix is always a normalized key
-  private void doUnsubscribeForPropertyChange(String prefix, ZkCallbackHandler<T> callback)
-  {
-    String path = getAbsolutePath(prefix);
-
-    _zkClient.unsubscribeDataChanges(path, callback);
-    _zkClient.unsubscribeChildChanges(path, callback);
-
-    List<String> childs = _zkClient.getChildren(path);
-    if (childs == null || childs.size() == 0)
-    {
-      return;
-    }
-
-    for (String child : childs)
-    {
-      doUnsubscribeForPropertyChange(prefix + "/" + child, callback);
-    }
-  }
-
-  @Override
-  public void unsubscribeForPropertyChange(String prefix,
-                                           PropertyChangeListener<T> listener) throws PropertyStoreException
-  {
-    if (listener == null)
-    {
-      throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
-    }
-
-    String path = getAbsolutePath(prefix);
-    ZkCallbackHandler<T> callback = null;
-
-    synchronized (_callbackMap)
-    {
-      if (_callbackMap.containsKey(path))
-      {
-        Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
-            _callbackMap.get(path);
-        callback = callbacks.remove(listener);
-
-        if (callbacks == null || callbacks.isEmpty())
-        {
-          _callbackMap.remove(path);
-        }
-      }
-    }
-
-    if (callback != null)
-    {
-      doUnsubscribeForPropertyChange(prefix, callback);
-      LOG.debug("Unsubscribed changes for " + path);
-    }
-  }
-
-  @Override
-  public boolean canParentStoreData()
-  {
-    return false;
-  }
-
-  @Override
-  public void setPropertySerializer(final PropertySerializer<T> serializer)
-  {
-    if (serializer == null)
-    {
-      throw new IllegalArgumentException("serializer can't be null");
-    }
-
-    _serializer = serializer;
-  }
-
-  @Override
-  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater) throws PropertyStoreException
-  {
-    updatePropertyUntilSucceed(key, updater, true);
-  }
-
-  @Override
-  public void updatePropertyUntilSucceed(String key,
-                                         DataUpdater<T> updater,
-                                         boolean createIfAbsent) throws PropertyStoreException
-  {
-    String path = getAbsolutePath(key);
-    try
-    {
-      if (!_zkClient.exists(path))
-      {
-        if (!createIfAbsent)
-        {
-          throw new PropertyStoreException("Can't update " + key
-              + " since no node exists");
-        }
-        else
-        {
-          _zkClient.createPersistent(path, true);
-        }
-      }
-
-      _zkClient.updateDataSerialized(path, new ByteArrayUpdater(updater, _serializer));
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception in updatePropertyUntilSucceed(" + key + ", " + createIfAbsent
-          + ")", e);
-      throw (new PropertyStoreException(e.toString()));
-    }
-
-    // update cache
-    // getProperty(key);
-  }
-
-  @Override
-  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
-  {
-    return compareAndSet(key, expected, update, comparator, true);
-  }
-
-  @Override
-  public boolean compareAndSet(String key,
-                               T expected,
-                               T update,
-                               Comparator<T> comparator,
-                               boolean createIfAbsent)
-  {
-    String path = getAbsolutePath(key);
-
-    // if two threads call with createIfAbsent=true
-    // one thread creates the node, the other just goes through
-    // when wirteData() one thread writes the other gets ZkBadVersionException
-    if (!_zkClient.exists(path))
-    {
-      if (createIfAbsent)
-      {
-        _zkClient.createPersistent(path, true);
-      }
-      else
-      {
-        return false;
-      }
-    }
-
-    try
-    {
-      Stat stat = new Stat();
-      byte[] currentBytes = _zkClient.readDataAndStat(path, stat, true);
-      T current = null;
-      if (currentBytes != null)
-      {
-        current = _serializer.deserialize(currentBytes);
-      }
-
-      if (comparator.compare(current, expected) == 0)
-      {
-        byte[] valueBytes = _serializer.serialize(update);
-        _zkClient.writeData(path, valueBytes, stat.getVersion());
-
-        // update cache
-        // getProperty(key);
-
-        return true;
-      }
-    }
-    catch (ZkBadVersionException e)
-    {
-      LOG.warn("Get BadVersion when writing to zookeeper. Mostly Ignorable due to contention");
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception when compareAndSet(" + key + ")", e);
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean exists(String key)
-  {
-    String path = getAbsolutePath(key);
-    return _zkClient.exists(path);
-  }
-
-  @Override
-  public void handleStateChanged(KeeperState state) throws Exception
-  {
-    LOG.info("KeeperState:" + state);
-    switch (state)
-    {
-    case SyncConnected:
-      _isConnected = true;
-      break;
-    case Disconnected:
-      _isConnected = false;
-      break;
-    case Expired:
-      _isConnected = false;
-      _hasSessionExpired = true;
-      break;
-    }
-  }
-
-  @Override
-  public void handleNewSession() throws Exception
-  {
-    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
-    ZooKeeper zookeeper = connection.getZookeeper();
-    LOG.info("handleNewSession: " + zookeeper.getSessionId());
-
-    synchronized (_callbackMap)
-    {
-      for (String path : _callbackMap.keySet())
-      {
-        Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
-            _callbackMap.get(path);
-        if (callbacks == null || callbacks.size() == 0)
-        {
-          LOG.error("Get a null callback map. Remove it. Path: " + path);
-          _callbackMap.remove(path);
-          continue;
-        }
-
-        for (PropertyChangeListener<T> listener : callbacks.keySet())
-        {
-          ZkCallbackHandler<T> callback = callbacks.get(listener);
-
-          if (callback == null)
-          {
-            LOG.error("Get a null callback. Remove it. Path: " + path + ", listener: "
-                + listener);
-            callbacks.remove(listener);
-            continue;
-          }
-          _zkClient.subscribeDataChanges(path, callback);
-          _zkClient.subscribeChildChanges(path, callback);
-
-          // do initial invocation
-          callback.handleChildChange(path, _zkClient.getChildren(path));
-        }
-      }
-    }
-  }
-
-  @Override
-  public boolean start()
-  {
-    // TODO Auto-generated method stub
-    return false;
-  }
-
-  @Override
-  public boolean stop()
-  {
-    _zkClient.close();
-    return true;
-  }
-
-  @Override
-  public void handleDataChange(String dataPath, Object data) throws Exception
-  {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void handleDataDeleted(String dataPath) throws Exception
-  {
-    // TODO Auto-generated method stub
-    String key = getRelativePath(dataPath);
-    synchronized (_cache)
-    {
-      _zkClient.unsubscribeDataChanges(dataPath, this);
-      _cache.remove(key);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
deleted file mode 100644
index 555e517..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZNode.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.zookeeper.data.Stat;
-
-public class ZNode
-{
-  // used for a newly created item, because zkclient.create() doesn't return stat
-  // or used for places where we don't care about stat
-  public static final Stat ZERO_STAT = new Stat();
-  
-  final String _zkPath;
-  private Stat _stat;
-  Object _data;
-  Set<String> _childSet;
-
-  public ZNode(String zkPath, Object data, Stat stat)
-  {
-    _zkPath = zkPath;
-    _childSet = Collections.<String>emptySet(); // new HashSet<String>();
-    _data = data;
-    _stat = stat;
-  }
-
-  public void removeChild(String child)
-  {
-    if (_childSet != Collections.<String>emptySet())
-    {
-      _childSet.remove(child);
-    }
-  }
-  
-  public void addChild(String child)
-  {
-    if (_childSet == Collections.<String>emptySet())
-    {
-      _childSet = new HashSet<String>();
-    }
-    
-    _childSet.add(child);
-  }
-  
-  public void addChildren(List<String> children)
-  {
-    if (children != null && !children.isEmpty())
-    {
-      if (_childSet == Collections.<String>emptySet())
-      {
-        _childSet = new HashSet<String>();
-      }
-
-      _childSet.addAll(children);
-    }
-  }
-
-  public boolean hasChild(String child)
-  {
-    return _childSet.contains(child);
-  }
-
-  public Set<String> getChildSet()
-  {
-    return _childSet;
-  }
-  
-  public void setData(Object data)
-  {
-//    System.out.println("setData: " + _zkPath + ", data: " + data);
-    _data= data;    
-  }
-  
-  public Object getData()
-  {
-    return _data;
-  }
-  
-  public void setStat(Stat stat)
-  {
-    _stat = stat;
-  }
-  
-  public Stat getStat()
-  {
-    return _stat;
-  }
-  
-  public void setChildSet(List<String> childNames)
-  {
-    if (childNames != null && !childNames.isEmpty())
-    {
-      if (_childSet == Collections.<String>emptySet())
-      {
-        _childSet = new HashSet<String>();
-      }
-
-      _childSet.clear();
-      _childSet.addAll(childNames);
-    }
-  }
-  
-  @Override
-  public String toString()
-  {
-    return _zkPath + ", " + _data + ", " + _childSet + ", " + _stat;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
deleted file mode 100644
index 48bb0d2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkCallbackHandler.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.store.zk;
-
-import java.util.List;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.store.PropertyChangeListener;
-
-class ZkCallbackHandler<T> implements IZkChildListener, IZkDataListener
-{
-  private static Logger LOG = Logger.getLogger(ZkCallbackHandler.class);
-
-  private final ZkClient _zkClient;
-  private final ZKPropertyStore<T> _store;
-
-  // listen on prefix and all its childs
-  private final String _prefix;
-  private final PropertyChangeListener<T> _listener;
-
-  public ZkCallbackHandler(ZkClient client, ZKPropertyStore<T> store, String prefix,
-                           PropertyChangeListener<T> listener)
-  {
-    _zkClient = client;
-    _store = store;
-    _prefix = prefix;
-    _listener = listener;
-  }
-
-  @Override
-  public void handleDataChange(String path, Object data) throws Exception
-  {
-    LOG.debug("Data changed @ " + path + " to " + data);
-    String key = _store.getRelativePath(path);
-    _listener.onPropertyChange(key);
-  }
-
-  @Override
-  public void handleDataDeleted(String dataPath) throws Exception
-  {
-    LOG.debug("Data deleted @ " + dataPath);
-  }
-
-  @Override
-  public void handleChildChange(String path, List<String> currentChilds) throws Exception
-  {
-    LOG.debug("childs changed @ " + path + " to " + currentChilds);
-    // System.out.println("childs changed @ " + path + " to " + currentChilds);
-
-
-    if (currentChilds == null)
-    {
-      /**
-       * When a node with a child change watcher is deleted
-       * a child change is triggered on the deleted node
-       * and in this case, the currentChilds is null
-       */
-      return;
-//    } else if (currentChilds.size() == 0)
-//    {
-//      String key = _store.getRelativePath(path);
-//      _listener.onPropertyChange(key);
-    }
-    else
-    {
-      String key = _store.getRelativePath(path);
-      _listener.onPropertyChange(key);
-
-      for (String child : currentChilds)
-      {
-        String childPath = path.endsWith("/") ? path + child : path + "/" + child;
-        _zkClient.subscribeDataChanges(childPath, this);
-        _zkClient.subscribeChildChanges(childPath, this);
-
-        // recursive call
-        handleChildChange(childPath, _zkClient.getChildren(childPath));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
deleted file mode 100644
index 785efea..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkHelixPropertyStore.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-import java.util.List;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkCacheBaseDataAccessor;
-
-public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T>
-{
-  public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor,
-                              String root,
-                              List<String> subscribedPaths)
-  {
-    super(accessor, root, null, subscribedPaths);
-  }
-
-  public ZkHelixPropertyStore(String zkAddress,
-                              ZkSerializer serializer,
-                              String chrootPath,
-                              List<String> zkCachePaths)
-  {
-    super(zkAddress, serializer, chrootPath, null, zkCachePaths);
-  }
-
-  public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath)
-  {
-    super(zkAddress, serializer, chrootPath, null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
deleted file mode 100644
index c9b264a..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/ZkListener.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.linkedin.helix.store.zk;
-
-public interface ZkListener
-{
-  void handleDataChange(String path);
-  
-  void handleNodeCreate(String path);
-  
-  void handleNodeDelete(String path);
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java b/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
deleted file mode 100644
index cd9735c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/store/zk/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * Helix implementation of zookeeper-based property store (Deprecated)
- * 
- */
-package com.linkedin.helix.store.zk;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java b/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
deleted file mode 100644
index 6783046..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/CLMLogFileAppender.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Layout;
-import org.apache.log4j.spi.ErrorCode;
-
-public class CLMLogFileAppender extends FileAppender
-{
-  public CLMLogFileAppender()
-  {
-  }
-
-  public CLMLogFileAppender(Layout layout, String filename, boolean append,
-      boolean bufferedIO, int bufferSize) throws IOException
-  {
-    super(layout, filename, append, bufferedIO, bufferSize);
-  }
-
-  public CLMLogFileAppender(Layout layout, String filename, boolean append)
-      throws IOException
-  {
-    super(layout, filename, append);
-  }
-
-  public CLMLogFileAppender(Layout layout, String filename) throws IOException
-  {
-    super(layout, filename);
-  }
-
-  public void activateOptions()
-  {
-    if (fileName != null)
-    {
-      try
-      {
-        fileName = getNewLogFileName();
-        setFile(fileName, fileAppend, bufferedIO, bufferSize);
-      } catch (Exception e)
-      {
-        errorHandler.error("Error while activating log options", e,
-            ErrorCode.FILE_OPEN_FAILURE);
-      }
-    }
-  }
-
-  private String getNewLogFileName()
-  {
-    Calendar cal = Calendar.getInstance();
-    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
-    String time = sdf.format(cal.getTime());
-
-    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-    StackTraceElement main = stack[stack.length - 1];
-    String mainClass = main.getClassName();
-
-    return System.getProperty("user.home") + "/EspressoLogs/" + mainClass + "_"
-        + time + ".txt";
-  }
-}


Mime
View raw message