helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [20/42] Refactoring the package names and removing jsql parser
Date Wed, 24 Oct 2012 23:14:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
new file mode 100644
index 0000000..f69dbfc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/FileHelixPropertyStore.java
@@ -0,0 +1,293 @@
+package org.apache.helix.store.file;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.store.HelixPropertyListener;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.zookeeper.data.Stat;
+
+
+public class FileHelixPropertyStore<T> implements HelixPropertyStore<T>
+{
+  final FilePropertyStore<T> _store;
+
+  public FileHelixPropertyStore(final PropertySerializer<T> serializer,
+                                String rootNamespace,
+                                final PropertyJsonComparator<T> comparator)
+  {
+    _store = new FilePropertyStore<T>(serializer, rootNamespace, comparator);
+  }
+
+  @Override
+  public boolean create(String path, T record, int options)
+  {
+    return set(path, record, options);
+  }
+
+  @Override
+  public boolean set(String path, T record, int options)
+  {
+    try
+    {
+      _store.setProperty(path, record);
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean update(String path, DataUpdater<T> updater, int options)
+  {
+    _store.updatePropertyUntilSucceed(path, updater);
+    return true;
+  }
+
+  @Override
+  public boolean remove(String path, int options)
+  {
+    try
+    {
+      _store.removeProperty(path);
+      return true;
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean[] createChildren(List<String> paths, List<T> records, int options)
+  {
+    return setChildren(paths, records, options);
+  }
+
+  @Override
+  public boolean[] setChildren(List<String> paths, List<T> records, int options)
+  {
+    boolean[] success = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      success[i] = create(paths.get(i), records.get(i), options);
+    }
+    return success;
+  }
+
+  @Override
+  public boolean[] updateChildren(List<String> paths,
+                                  List<DataUpdater<T>> updaters,
+                                  int options)
+  {
+    boolean[] success = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      success[i] = update(paths.get(i), updaters.get(i), options);
+    }
+    return success;
+
+  }
+
+  @Override
+  public boolean[] remove(List<String> paths, int options)
+  {
+    boolean[] success = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      success[i] = remove(paths.get(i), options);
+    }
+    return success;
+  }
+
+  @Override
+  public T get(String path, Stat stat, int options)
+  {
+    PropertyStat propertyStat = new PropertyStat();
+    try
+    {
+      T value = _store.getProperty(path, propertyStat);
+      if (stat != null)
+      {
+        stat.setVersion(propertyStat.getVersion());
+        stat.setMtime(propertyStat.getLastModifiedTime());
+      }
+      return value;
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public List<T> get(List<String> paths, List<Stat> stats, int options)
+  {
+    List<T> values = new ArrayList<T>();
+    for (int i = 0; i < paths.size(); i++)
+    {
+      values.add(get(paths.get(i), stats.get(i), options));
+    }
+    return values;
+  }
+
+  @Override
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options)
+  {
+    List<String> childNames = getChildNames(parentPath, options);
+    
+    if (childNames == null)
+    {
+      return null;
+    }
+
+    List<String> paths = new ArrayList<String>();
+    for (String childName : childNames)
+    {
+      String path = parentPath + "/" + childName;
+      paths.add(path);
+    }
+    
+    return get(paths, stats, options);
+  }
+
+  @Override
+  public List<String> getChildNames(String parentPath, int options)
+  {
+    try
+    {
+      return _store.getPropertyNames(parentPath);
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return Collections.emptyList();
+  }
+
+  @Override
+  public boolean exists(String path, int options)
+  {
+    return _store.exists(path);
+  }
+
+  @Override
+  public boolean[] exists(List<String> paths, int options)
+  {
+    boolean[] exists = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      exists[i] = exists(paths.get(i), options);
+    }
+    return exists;
+  }
+
+  @Override
+  public Stat[] getStats(List<String> paths, int options)
+  {
+    Stat[] stats = new Stat[paths.size()];
+    for (int i = 0; i < paths.size(); i++)
+    {
+      stats[i] = getStat(paths.get(i), options);
+    }
+    return stats;
+  }
+
+  @Override
+  public Stat getStat(String path, int options)
+  {
+    Stat stat = new Stat();
+    get(path, stat, options);
+    return stat;
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener)
+  {
+    throw new UnsupportedOperationException("subscribeDataChanges not supported");
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener)
+  {
+    throw new UnsupportedOperationException("unsubscribeDataChanges not supported");
+  }
+
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener)
+  {
+    throw new UnsupportedOperationException("subscribeChildChanges not supported");
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener)
+  {
+    throw new UnsupportedOperationException("unsubscribeChildChanges not supported");
+  }
+
+  @Override
+  public void start()
+  {
+    _store.start();
+  }
+
+  @Override
+  public void stop()
+  {
+    _store.stop();
+  }
+
+  @Override
+  public void subscribe(String parentPath, final HelixPropertyListener listener)
+  {
+    try
+    {
+      _store.subscribeForPropertyChange(parentPath, new PropertyChangeListener<T>()
+      {
+
+        @Override
+        public void onPropertyChange(String key)
+        {
+          listener.onDataChange(key);
+        }
+      });
+    }
+    catch (PropertyStoreException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void unsubscribe(String parentPath, HelixPropertyListener listener)
+  {
+    throw new UnsupportedOperationException("unsubscribe not supported");
+  }
+
+  @Override
+  public void reset()
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
new file mode 100644
index 0000000..1b50613
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/FilePropertyStore.java
@@ -0,0 +1,942 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.commons.io.DirectoryWalker;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.manager.file.FileCallbackHandler;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.log4j.Logger;
+
+
+/**
+ *
+ * property store that built upon a file system
+ * since file systems usually have sophisticated cache mechanisms
+ * there is no need for another cache for file property store
+ *
+ * NOTES:
+ * lastModified timestamp provided by java file io has only second level precision
+ * so it is possible that files have been modified without changing its lastModified timestamp
+ * the solution is to use a map that caches the files changed in last second
+ * and in the next round of refresh, check against this map to figure out whether a file
+ * has been changed/created in the last second
+ * @link http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6939260
+ *
+ * @author zzhang
+ * @param <T>
+ */
+
+public class FilePropertyStore<T> implements PropertyStore<T>
+{
+  private static Logger logger = Logger.getLogger(FilePropertyStore.class);
+
+  private final String ROOT = "/";
+  private final long TIMEOUT = 30L;
+  private final long REFRESH_PERIOD = 1000; // ms
+  private final int _id = new Random().nextInt();
+  private final String _rootNamespace;
+  private PropertySerializer<T> _serializer;
+  private final PropertyJsonComparator<T> _comparator;
+
+  private Thread _refreshThread;
+  private final AtomicBoolean _stopRefreshThread;
+  private final CountDownLatch _firstRefreshCounter;
+  private final ReadWriteLock _readWriteLock;
+
+  private Map<String, T> _lastModifiedFiles = new HashMap<String, T>();
+  private Map<String, T> _curModifiedFiles = new HashMap<String, T>();
+
+  private final Map< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > _fileChangeListeners; // map key to listener
+
+  private class FilePropertyStoreRefreshThread implements Runnable
+  {
+    private final PropertyStoreDirWalker _dirWalker;
+
+    public class PropertyStoreDirWalker extends DirectoryWalker
+    {
+      private final File _propertyStoreRootDir;
+      private long _lastNotifiedTime = 0;
+      private long _currentHighWatermark;
+
+      public PropertyStoreDirWalker(String rootNamespace, ReadWriteLock readWriteLock)
+      {
+        _propertyStoreRootDir = new File(rootNamespace);
+      }
+
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      @Override
+      protected void handleFile(File file, int depth, Collection results) throws IOException
+      {
+        if (file.lastModified() < _lastNotifiedTime)
+        {
+          return;
+        }
+
+        String path = getRelativePath(file.getAbsolutePath());
+        T newValue = null;
+        try
+        {
+          newValue = getProperty(path);
+        } catch (PropertyStoreException e)
+        {
+          logger.error("fail to get property, path:" + path, e);
+        }
+
+        if (file.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
+        {
+          T value = _lastModifiedFiles.get(path);
+
+          if (_comparator.compare(value, newValue) == 0)
+          {
+            if (file.lastModified() == _currentHighWatermark)
+            {
+              _curModifiedFiles.put(path, newValue);
+            }
+            return;
+          }
+        }
+
+        if (file.lastModified() > _currentHighWatermark)
+        {
+          _currentHighWatermark = file.lastModified();
+
+          _curModifiedFiles.clear();
+          _curModifiedFiles.put(path, newValue);
+        }
+        else if (file.lastModified() == _currentHighWatermark)
+        {
+          _curModifiedFiles.put(path, newValue);
+        }
+
+        // debug
+//        logger.error("file: " + file.getAbsolutePath() + " changed@" + file.lastModified() + " (" +
+//            new Date(file.lastModified()) + ")");
+        results.add(file);
+      }
+
+      @Override
+      protected boolean handleDirectory(File dir, int depth, Collection results) throws IOException
+      {
+        if (dir.lastModified() < _lastNotifiedTime)
+        {
+          return true;
+        }
+
+        String path = getRelativePath(dir.getAbsolutePath());
+        T newValue = null;
+        try
+        {
+          newValue = getProperty(path);
+        }
+        catch (PropertyStoreException e)
+        {
+          logger.error("fail to get property, path:" + path, e);
+        }
+
+        if (dir.lastModified() == _lastNotifiedTime && _lastModifiedFiles.containsKey(path))
+        {
+          T value = _lastModifiedFiles.get(path);
+          if (_comparator.compare(value, newValue) == 0)
+          {
+            if (dir.lastModified() == _currentHighWatermark)
+            {
+              _curModifiedFiles.put(path, newValue);
+            }
+            return true;
+          }
+        }
+        _curModifiedFiles.put(path, newValue);
+
+        if (dir.lastModified() > _currentHighWatermark)
+        {
+          _currentHighWatermark = dir.lastModified();
+
+          _curModifiedFiles.clear();
+          _curModifiedFiles.put(path, newValue);
+        }
+        else if (dir.lastModified() == _currentHighWatermark)
+        {
+          _curModifiedFiles.put(path, newValue);
+        }
+
+        logger.debug("dir: " + dir.getAbsolutePath() + " changed@" + dir.lastModified() +
+            " (" + new Date(dir.lastModified()) + ")");
+        results.add(dir);
+
+        return true;
+      }
+
+      public void walk()
+      {
+        HashSet<File> files = new HashSet<File>();
+
+        try
+        {
+          _currentHighWatermark = _lastNotifiedTime;
+          _readWriteLock.readLock().lock();
+          super.walk(_propertyStoreRootDir, files);
+        }
+        catch (IOException e)
+        {
+          logger.error("IO exception when walking through dir:" + _propertyStoreRootDir, e);
+        }
+        finally
+        {
+          _lastNotifiedTime = _currentHighWatermark;
+          _lastModifiedFiles.clear();
+
+          Map<String, T> temp = _lastModifiedFiles;
+          _lastModifiedFiles = _curModifiedFiles;
+          _curModifiedFiles = temp;
+          _readWriteLock.readLock().unlock();
+        }
+
+        // TODO see if we can use DirectoryFileComparator.DIRECTORY_COMPARATOR.sort()
+        File[] fileArray = new File[files.size()];
+        fileArray = files.toArray(fileArray);
+        Arrays.sort(fileArray, new Comparator<File>() {
+
+          @Override
+          public int compare(File file1, File file2)
+          {
+            return file1.getAbsoluteFile().compareTo(file2.getAbsoluteFile());
+          }
+
+        });
+
+
+        // notify listeners
+        for (int i = 0; i < fileArray.length; i++)
+        {
+          File file = fileArray[i];
+
+          // debug
+//          logger.error("Before send notification of " + file.getAbsolutePath() + " to listeners " + _fileChangeListeners);
+
+          for (Map.Entry< String, CopyOnWriteArraySet<PropertyChangeListener<T> > > entry : _fileChangeListeners.entrySet())
+          {
+            String absPath = file.getAbsolutePath();
+            if (absPath.startsWith(entry.getKey()))
+            {
+              for (PropertyChangeListener<T> listener : entry.getValue())
+              {
+                if (listener instanceof FileCallbackHandler)
+                {
+                  FileCallbackHandler handler = (FileCallbackHandler) listener;
+
+                  // debug
+//                  logger.error("Send notification of " + file.getAbsolutePath() + " to listener:" + handler.getListener());
+                }
+                listener.onPropertyChange(getRelativePath(absPath));
+              }
+            }
+          }
+        }
+      }
+    }
+
+    public FilePropertyStoreRefreshThread(ReadWriteLock readWriteLock)
+    {
+      _dirWalker = new PropertyStoreDirWalker(_rootNamespace, readWriteLock);
+    }
+
+    @Override
+    public void run()
+    {
+      while (!_stopRefreshThread.get())
+      {
+        _dirWalker.walk();
+        _firstRefreshCounter.countDown();
+
+        try
+        {
+          Thread.sleep(REFRESH_PERIOD);
+//          System.out.println("refresh thread is running");
+        }
+        catch (InterruptedException ie)
+        {
+          // do nothing
+        }
+      }
+
+      logger.info("Quitting file property store refresh thread");
+
+    }
+
+  }
+
+//  public FilePropertyStore(final PropertySerializer<T> serializer)
+//  {
+//    this(serializer, System.getProperty("java.io.tmpdir"));
+//  }
+
+  public FilePropertyStore(final PropertySerializer<T> serializer, String rootNamespace,
+      final PropertyJsonComparator<T> comparator)
+  {
+    _serializer = serializer;
+    _comparator = comparator;
+    _stopRefreshThread = new AtomicBoolean(false);
+    _firstRefreshCounter = new CountDownLatch(1);
+    _readWriteLock = new ReentrantReadWriteLock();
+
+    _fileChangeListeners = new ConcurrentHashMap< String, CopyOnWriteArraySet<PropertyChangeListener<T> > >();
+
+    // Strip off leading slash
+    while (rootNamespace.startsWith("/"))
+    {
+      // rootNamespace = rootNamespace.substring(1, rootNamespace.length());
+      rootNamespace = rootNamespace.substring(1);
+    }
+    _rootNamespace = "/" + rootNamespace;
+
+    this.createRootNamespace();
+  }
+
+
+  @Override
+  public boolean start()
+  {
+    // check if start has already been invoked
+    if (_firstRefreshCounter.getCount() == 0)
+      return true;
+
+    logger.debug("starting file property store polling thread, id=" + _id);
+
+    _stopRefreshThread.set(false);
+    _refreshThread = new Thread(new FilePropertyStoreRefreshThread(_readWriteLock),
+                                "FileRefreshThread_" + _id);
+    // _refreshThread.setDaemon(true);
+    _refreshThread.start();
+
+    try
+    {
+      boolean timeout = !_firstRefreshCounter.await(TIMEOUT, TimeUnit.SECONDS);
+      if (timeout)
+      {
+        throw new Exception("Timeout while waiting for the first refresh to complete");
+      }
+    }
+    catch (InterruptedException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean stop()
+  {
+    if (_stopRefreshThread.compareAndSet(false, true))
+    {
+      // _stopRefreshThread.set(true);
+      if (_refreshThread != null)
+      {
+        try
+        {
+          _refreshThread.join();
+        }
+        catch (InterruptedException e)
+        {
+        }
+      }
+    }
+    return true;
+  }
+
+  private String getPath(String key)
+  {
+    // Strip off leading slash
+    while (key.startsWith("/"))
+    {
+      // key = key.substring(1, key.length());
+      key = key.substring(1);
+    }
+
+    // String path = key.equals(ROOT) ? _rootNamespace : (_rootNamespace + "/" + key);
+    String path = key.equals("") ? _rootNamespace : (_rootNamespace + "/" + key);
+    return path;
+  }
+
+  private String getRelativePath(String path)
+  {
+    // strip off rootPath from path
+    if (!path.startsWith(_rootNamespace))
+    {
+      logger.warn("path does NOT start with: " + _rootNamespace);
+      return path;
+    }
+
+    if (path.equals(_rootNamespace))
+      return ROOT;
+
+    // path = path.substring(_rootNamespace.length() + 1);
+    path = path.substring(_rootNamespace.length());
+
+    return path;
+  }
+
+  public void createRootNamespace()
+  {
+    createPropertyNamespace(ROOT);
+  }
+
+  @Override
+  public void createPropertyNamespace(String prefix)
+  {
+    String path = getPath(prefix);
+    File dir = new File(path);
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      if (dir.exists())
+      {
+        logger.warn(path + " already exists");
+      }
+      else
+      {
+        if (!dir.mkdirs())
+        {
+          logger.warn("Failed to create: " + path);
+        }
+      }
+    }
+    catch (Exception e)
+    {
+      logger.error("Failed to create dir: " + path + "\nexception:" + e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+
+  }
+
+  @Override
+  public void setProperty(String key, T value) throws PropertyStoreException
+  {
+    String path = getPath(key);
+    File file = new File(path);  // null;
+    // FileLock fLock = null;
+    FileOutputStream fout = null;
+
+    // TODO create non-exist dirs recursively
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      // file = new File(path);
+      if (!file.exists())
+      {
+        FileUtils.touch(file);
+      }
+
+      fout = new FileOutputStream(file);
+      // FileChannel fChannel = fout.getChannel();
+
+      // TODO need a timeout on lock operation
+      // fLock = fChannel.lock();
+
+      byte[] bytes = _serializer.serialize(value);
+      fout.write(bytes);
+    }
+//    catch (FileNotFoundException e)
+//    {
+//      logger.error("fail to set property, key:" + key +
+//          "\nfile not found exception:" + e);
+//    }
+    catch (IOException e)
+    {
+      logger.error("fail to set property. key:" + key +
+          "value:" + value, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+      try
+      {
+        // if (fLock != null && fLock.isValid())
+        //   fLock.release();
+
+        if (fout != null)
+        {
+          fout.close();
+        }
+      }
+      catch (IOException e)
+      {
+        logger.error("fail to close file. key:" + key, e);
+      }
+
+    }
+
+  }
+
+  @Override
+  public T getProperty(String key) throws PropertyStoreException
+  {
+    return this.getProperty(key, null);
+  }
+
+  @Override
+  public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
+  {
+
+    String path = getPath(key);
+    File file = null;
+    // FileLock fLock = null;
+    FileInputStream fin = null;
+
+    try
+    {
+      // TODO need a timeout on lock operation
+      _readWriteLock.readLock().lock();
+
+      file = new File(path);
+      if (!file.exists())
+      {
+        return null;
+      }
+
+      fin = new FileInputStream(file);
+      // FileChannel fChannel = fin.getChannel();
+      // fLock = fChannel.lock(0L, Long.MAX_VALUE, true);
+
+      int availableBytes = fin.available();
+      if (availableBytes == 0)
+      {
+        return null;
+      }
+
+      byte[] bytes = new byte[availableBytes];
+      fin.read(bytes);
+
+      if (propertyStat != null)
+      {
+        propertyStat.setLastModifiedTime(file.lastModified());
+      }
+
+      return _serializer.deserialize(bytes);
+    }
+    catch (FileNotFoundException e)
+    {
+      return null;
+    }
+    catch (IOException e)
+    {
+      logger.error("fail to get property. key:" + key, e);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+      try
+      {
+        // if (fLock != null && fLock.isValid())
+        //   fLock.release();
+        if (fin != null)
+        {
+          fin.close();
+        }
+      }
+      catch (IOException e)
+      {
+        logger.error("fail to close file. key:" + key, e);
+      }
+
+    }
+
+    return null;
+  }
+
+  @Override
+  public void removeProperty(String key) throws PropertyStoreException
+  {
+    String path = getPath(key);
+    File file = new File(path);
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      if (!file.exists())
+      {
+        return;
+      }
+
+      boolean success = file.delete();
+      if (!success)
+      {
+        logger.error("fail to remove file. path:" + path);
+      }
+    }
+    catch (Exception e)
+    {
+      logger.error("fail to remove file. path:" + path, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+
+
+  }
+
+  public void removeRootNamespace() throws PropertyStoreException
+  {
+    removeNamespace(ROOT);
+  }
+
+  @Override
+  public void removeNamespace(String prefix) throws PropertyStoreException
+  {
+    String path = getPath(prefix);
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      FileUtils.deleteDirectory(new File(path));
+    }
+    catch (IOException e)
+    {
+      logger.error("fail to remove namespace, path:" + path, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+    }
+  }
+
+  private void doGetPropertyNames(String path, List<String> leafNodes)
+  throws PropertyStoreException
+  {
+    File file = new File(path);
+    if (!file.exists())
+    {
+      return;
+    }
+
+    // List<String> childs = _zkClient.getChildren(path);
+    if (file.isDirectory())
+    {
+      String[] childs = file.list();
+      if (childs == null || childs.length == 0)
+      {
+        return;
+      }
+      for (String child : childs)
+      {
+        String pathToChild = path + "/" + child;
+        doGetPropertyNames(pathToChild, leafNodes);
+      }
+    }
+    else if (file.isFile())
+    {
+      // getProperty(getRelativePath(path));
+      leafNodes.add(getRelativePath(path));
+      return;
+    }
+  }
+
+
+  @Override
+  public List<String> getPropertyNames(String prefix) throws PropertyStoreException
+  {
+    String path = getPath(prefix);
+    List<String> propertyNames = new ArrayList<String>();
+
+    try
+    {
+      _readWriteLock.readLock().lock();
+      doGetPropertyNames(path, propertyNames);
+    }
+    finally
+    {
+      _readWriteLock.readLock().unlock();
+    }
+
+    // sort it to get deterministic order
+    Collections.sort(propertyNames);
+
+    return propertyNames;
+  }
+
+  @Override
+  public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
+  {
+    throw new UnsupportedOperationException(
+        "setPropertyDelimiter() is NOT supported by FilePropertyStore");
+  }
+
+  @Override
+  public void subscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
+  throws PropertyStoreException
+  {
+    if (null != listener)
+    {
+      String path = getPath(prefix);
+      synchronized (_fileChangeListeners)
+      {
+        CopyOnWriteArraySet<PropertyChangeListener <T> > listeners = _fileChangeListeners.get(path);
+        if (listeners == null) {
+            listeners = new CopyOnWriteArraySet<PropertyChangeListener <T> >();
+            _fileChangeListeners.put(path, listeners);
+        }
+        listeners.add(listener);
+      }
+    }
+
+  }
+
+  @Override
+  public void unsubscribeForPropertyChange(String prefix, PropertyChangeListener<T> listener)
+  throws PropertyStoreException
+  {
+    if (null != listener)
+    {
+      String path = getPath(prefix);
+      synchronized (_fileChangeListeners)
+      {
+        final Set<PropertyChangeListener<T> > listeners = _fileChangeListeners.get(path);
+        if (listeners != null)
+        {
+            listeners.remove(listener);
+        }
+        if (listeners == null || listeners.isEmpty())
+        {
+            _fileChangeListeners.remove(path);
+        }
+      }
+    }
+
+  }
+
+  @Override
+  public boolean canParentStoreData()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public String getPropertyRootNamespace()
+  {
+    return _rootNamespace;
+  }
+
+  @Override
+  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater)
+  {
+    updatePropertyUntilSucceed(key, updater, true);
+  }
+
+  @Override
+  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater,
+      boolean createIfAbsent)
+  {
+    String path = getPath(key);
+    File file = new File(path);
+    RandomAccessFile raFile = null;
+    FileLock fLock = null;
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+      if (!file.exists())
+      {
+        FileUtils.touch(file);
+      }
+
+      raFile = new RandomAccessFile(file, "rw");
+      FileChannel fChannel = raFile.getChannel();
+      fLock = fChannel.lock();
+
+      T current = getProperty(key);
+      T update = updater.update(current);
+      setProperty(key, update);
+    }
+    catch (Exception e)
+    {
+      logger.error("fail to updatePropertyUntilSucceed, path:" + path, e);
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+      try
+      {
+        if (fLock != null && fLock.isValid())
+        {
+           fLock.release();
+        }
+
+        if (raFile != null)
+        {
+          raFile.close();
+        }
+      }
+      catch (IOException e)
+      {
+        logger.error("fail to close file, path:" + path, e);
+      }
+    }
+  }
+
+
+  @Override
+  public void setPropertySerializer(PropertySerializer<T> serializer)
+  {
+    _readWriteLock.writeLock().lock();
+    _serializer = serializer;
+    _readWriteLock.writeLock().unlock();
+  }
+
+  @Override
+  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
+  {
+    return compareAndSet(key, expected, update, comparator, false);
+  }
+
+  @Override
+  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator,
+                               boolean createIfAbsent)
+  {
+    String path = getPath(key);
+    File file = new File(path);
+//    FileInputStream fin = null;
+//    FileOutputStream fout = null;
+    RandomAccessFile raFile = null;
+    FileLock fLock = null;
+
+    try
+    {
+      _readWriteLock.writeLock().lock();
+
+      if (createIfAbsent)
+      {
+        file.createNewFile();
+      }
+
+//      fin = new FileInputStream(file);
+//      FileChannel fChannel = fin.getChannel();
+      raFile = new RandomAccessFile(file, "rw");
+      FileChannel fChannel = raFile.getChannel();
+      fLock = fChannel.lock();
+
+      T current = getProperty(key);
+      if (comparator.compare(current, expected) == 0)
+      {
+//        fout = new FileOutputStream(file);
+//
+//        byte[] bytes = _serializer.serialize(update);
+//        fout.write(bytes);
+        setProperty(key, update);
+        return true;
+      }
+
+      return false;
+    }
+    catch (FileNotFoundException e)
+    {
+      logger.error("fail to compareAndSet. path:" + path, e);
+      return false;
+    }
+    catch (Exception e)
+    {
+      logger.error("fail to compareAndSet. path:" + path, e);
+      return false;
+    }
+    finally
+    {
+      _readWriteLock.writeLock().unlock();
+      try
+      {
+        if (fLock != null && fLock.isValid())
+        {
+           fLock.release();
+        }
+
+        if (raFile != null)
+        {
+          raFile.close();
+        }
+
+//        if (fin != null)
+//        {
+//          fin.close();
+//        }
+//
+//        if (fout != null)
+//        {
+//          fout.close();
+//        }
+      }
+      catch (IOException e)
+      {
+        logger.error("fail to close file. path:" + path, e);
+      }
+    }
+
+  }
+
+  @Override
+  public boolean exists(String key)
+  {
+    String path = getPath(key);
+    File file = new File(path);
+    _readWriteLock.readLock().lock();
+
+    boolean ret = file.exists();
+    _readWriteLock.readLock().unlock();
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/file/package-info.java b/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
new file mode 100644
index 0000000..33d87d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/file/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix implementation of file-based property store (Deprecated)
+ * 
+ */
+package org.apache.helix.store.file;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/package-info.java b/helix-core/src/main/java/org/apache/helix/store/package-info.java
new file mode 100644
index 0000000..62c1fad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix application property store classes
+ * 
+ */
+package org.apache.helix.store;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java b/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
new file mode 100644
index 0000000..8133bc8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/PropertyItem.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import org.apache.zookeeper.data.Stat;
+
+public class PropertyItem
+{
+  byte[] _value;
+  Stat _stat;
+  
+  public PropertyItem(byte[] value, Stat stat)
+  {
+    _value = value;
+    _stat = stat;
+  }
+  
+  public byte[] getBytes()
+  {
+    return _value;
+  }
+  
+  public int getVersion()
+  {
+    return _stat.getVersion();
+  }
+  
+  public long getLastModifiedTime()
+  {
+    return _stat.getMtime();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
new file mode 100644
index 0000000..b5092b6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZKPropertyStore.java
@@ -0,0 +1,735 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.helix.store.PropertySerializer;
+import org.apache.helix.store.PropertyStat;
+import org.apache.helix.store.PropertyStore;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+@Deprecated
+public class ZKPropertyStore<T> implements
+    PropertyStore<T>,
+    IZkStateListener,
+    IZkDataListener // , IZkChildListener,
+{
+  private static Logger LOG = Logger.getLogger(ZKPropertyStore.class);
+
+  class ByteArrayUpdater implements DataUpdater<byte[]>
+  {
+    final DataUpdater<T> _updater;
+    final PropertySerializer<T> _serializer;
+
+    ByteArrayUpdater(DataUpdater<T> updater, PropertySerializer<T> serializer)
+    {
+      _updater = updater;
+      _serializer = serializer;
+    }
+
+    @Override
+    public byte[] update(byte[] current)
+    {
+      try
+      {
+        T currentValue = null;
+        if (current != null)
+        {
+          currentValue = _serializer.deserialize(current);
+        }
+        T updateValue = _updater.update(currentValue);
+        return _serializer.serialize(updateValue);
+      }
+      catch (PropertyStoreException e)
+      {
+        LOG.error("Exception in update. Updater: " + _updater, e);
+      }
+      return null;
+    }
+  }
+
+  private volatile boolean _isConnected = false;
+  private volatile boolean _hasSessionExpired = false;
+
+  protected final ZkClient _zkClient;
+  protected PropertySerializer<T> _serializer;
+  protected final String _root;
+
+  // zookeeperPath->userCallbak->zkCallback
+  private final Map<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>> _callbackMap =
+      new HashMap<String, Map<PropertyChangeListener<T>, ZkCallbackHandler<T>>>();
+
+  // TODO cache capacity should be bounded
+  private final Map<String, PropertyItem> _cache =
+      new ConcurrentHashMap<String, PropertyItem>();
+
+  /**
+   * The given zkClient is assumed to serialize and deserialize raw byte[]
+   * for the given root and its descendants.
+   */
+  public ZKPropertyStore(ZkClient zkClient, final PropertySerializer<T> serializer,
+                         String root)
+  {
+    if (zkClient == null || serializer == null || root == null)
+    {
+      throw new IllegalArgumentException("zkClient|serializer|root can't be null");
+    }
+
+    _root = normalizeKey(root);
+    _zkClient = zkClient;
+
+    setPropertySerializer(serializer);
+
+    _zkClient.createPersistent(_root, true);
+    _zkClient.subscribeStateChanges(this);
+  }
+
+  // key is normalized if it has exactly 1 leading slash
+  private String normalizeKey(String key)
+  {
+    if (key == null)
+    {
+      LOG.error("Key can't be null");
+      throw new IllegalArgumentException("Key can't be null");
+    }
+
+    // strip off leading slash
+    while (key.startsWith("/"))
+    {
+      key = key.substring(1);
+    }
+
+    return "/" + key;
+  }
+
+  private String getAbsolutePath(String key)
+  {
+    key = normalizeKey(key);
+    if (key.equals("/"))
+    {
+      return _root;
+    }
+    else
+    {
+      return _root + key;
+    }
+  }
+
+  // always a return normalized key
+  String getRelativePath(String path)
+  {
+    if (!path.startsWith(_root))
+    {
+      String errMsg = path + "does NOT start with property store's root: " + _root;
+      LOG.error(errMsg);
+      throw new IllegalArgumentException(errMsg);
+    }
+
+    if (path.equals(_root))
+    {
+      return "/";
+    }
+    else
+    {
+      return path.substring(_root.length());
+    }
+  }
+
+  @Override
+  public void createPropertyNamespace(String prefix) throws PropertyStoreException
+  {
+    String path = getAbsolutePath(prefix);
+    try
+    {
+      if (!_zkClient.exists(path))
+      {
+        _zkClient.createPersistent(path, true);
+      }
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in creatPropertyNamespace(" + prefix + ")", e);
+      throw new PropertyStoreException(e.toString());
+    }
+  }
+
+  @Override
+  public void setProperty(String key, final T value) throws PropertyStoreException
+  {
+    String path = getAbsolutePath(key);
+
+    try
+    {
+      if (!_zkClient.exists(path))
+      {
+        _zkClient.createPersistent(path, true);
+      }
+
+      // serializer should handle value == null
+      byte[] valueBytes = _serializer.serialize(value);
+      _zkClient.writeData(path, valueBytes);
+
+      // update cache
+      // getProperty(key);
+
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception when setProperty(" + key + ", " + value + ")", e);
+      throw new PropertyStoreException(e.toString());
+    }
+  }
+
+  @Override
+  public T getProperty(String key) throws PropertyStoreException
+  {
+    return getProperty(key, null);
+  }
+
+  // bytes and stat are not null
+  private T getValueAndStat(byte[] bytes, Stat stat, PropertyStat propertyStat) throws PropertyStoreException
+  {
+    T value = _serializer.deserialize(bytes);
+
+    if (propertyStat != null)
+    {
+      propertyStat.setLastModifiedTime(stat.getMtime());
+      propertyStat.setVersion(stat.getVersion());
+    }
+    return value;
+  }
+
+  @Override
+  public T getProperty(String key, PropertyStat propertyStat) throws PropertyStoreException
+  {
+    String normalizedKey = normalizeKey(key);
+    String path = getAbsolutePath(normalizedKey);
+    Stat stat = new Stat();
+
+    T value = null;
+    try
+    {
+      synchronized (_cache)
+      {
+        PropertyItem item = _cache.get(normalizedKey);
+        _zkClient.subscribeDataChanges(path, this);
+        if (item != null)
+        {
+          // cache hit
+          stat = _zkClient.getStat(path);
+          if (stat != null)
+          {
+            if (item._stat.getCzxid() != stat.getCzxid()
+                || item.getVersion() < stat.getVersion())
+            {
+              // stale data in cache
+              byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
+              if (bytes != null)
+              {
+                value = getValueAndStat(bytes, stat, propertyStat);
+                _cache.put(normalizedKey, new PropertyItem(bytes, stat));
+              }
+            }
+            else
+            {
+              // valid data in cache
+              // item.getBytes() should not be null
+              value = getValueAndStat(item.getBytes(), stat, propertyStat);
+            }
+          }
+        }
+        else
+        {
+          // cache miss
+          byte[] bytes = _zkClient.readDataAndStat(path, stat, true);
+          if (bytes != null)
+          {
+            value = getValueAndStat(bytes, stat, propertyStat);
+            _cache.put(normalizedKey, new PropertyItem(bytes, stat));
+          }
+        }
+      }
+      return value;
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in getProperty(" + key + ")", e);
+      throw (new PropertyStoreException(e.toString()));
+    }
+  }
+
+  @Override
+  public void removeProperty(String key) throws PropertyStoreException
+  {
+    String normalizedKey = normalizeKey(key);
+    String path = getAbsolutePath(normalizedKey);
+
+    try
+    {
+      // if (_zkClient.exists(path))
+      // {
+      _zkClient.delete(path);
+      // }
+      // _cache.remove(normalizedKey);
+
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in removeProperty(" + key + ")", e);
+      throw (new PropertyStoreException(e.toString()));
+    }
+  }
+
+  @Override
+  public String getPropertyRootNamespace()
+  {
+    return _root;
+  }
+
+  @Override
+  public void removeNamespace(String prefix) throws PropertyStoreException
+  {
+    String path = getAbsolutePath(prefix);
+
+    try
+    {
+      // if (_zkClient.exists(path))
+      // {
+      _zkClient.deleteRecursive(path);
+      // }
+
+      // update cache
+      // childs are all normalized keys
+      // List<String> childs = getPropertyNames(prefix);
+      // for (String child : childs)
+      // {
+      // _cache.remove(child);
+      // }
+    }
+    catch (ZkNoNodeException e)
+    {
+      // OK
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in removeProperty(" + prefix + ")", e);
+      throw (new PropertyStoreException(e.toString()));
+    }
+  }
+
+  // prefix is always normalized
+  private void doGetPropertyNames(String prefix, List<String> leafNodes) throws PropertyStoreException
+  {
+    String path = getAbsolutePath(prefix);
+
+    if (!_zkClient.exists(path))
+    {
+      return;
+    }
+
+    List<String> childs = _zkClient.getChildren(path);
+    if (childs == null)
+    {
+      return;
+    }
+
+    if (childs.size() == 0)
+    {
+      // add leaf node to cache
+      // getProperty(prefix);
+      leafNodes.add(prefix);
+      return;
+    }
+
+    for (String child : childs)
+    {
+      String childPath = prefix.equals("/") ? prefix + child : prefix + "/" + child;
+      doGetPropertyNames(childPath, leafNodes);
+    }
+  }
+
+  @Override
+  public List<String> getPropertyNames(String prefix) throws PropertyStoreException
+  {
+    String normalizedKey = normalizeKey(prefix);
+    List<String> propertyNames = new ArrayList<String>();
+    doGetPropertyNames(normalizedKey, propertyNames);
+
+    // sort it to get deterministic order
+    if (propertyNames.size() > 1)
+    {
+      Collections.sort(propertyNames);
+    }
+
+    return propertyNames;
+  }
+
+  @Override
+  public void setPropertyDelimiter(String delimiter) throws PropertyStoreException
+  {
+    throw new PropertyStoreException("setPropertyDelimiter() not implemented for ZKPropertyStore");
+  }
+
+  // put data/child listeners on prefix and all childs
+  @Override
+  public void subscribeForPropertyChange(String prefix,
+                                         final PropertyChangeListener<T> listener) throws PropertyStoreException
+  {
+    if (listener == null)
+    {
+      throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
+    }
+
+    String path = getAbsolutePath(prefix);
+
+    ZkCallbackHandler<T> callback = null;
+    synchronized (_callbackMap)
+    {
+      Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks;
+      if (!_callbackMap.containsKey(path))
+      {
+        _callbackMap.put(path,
+                         new HashMap<PropertyChangeListener<T>, ZkCallbackHandler<T>>());
+      }
+      callbacks = _callbackMap.get(path);
+
+      if (!callbacks.containsKey(listener))
+      {
+        callback = new ZkCallbackHandler<T>(_zkClient, this, prefix, listener);
+        callbacks.put(listener, callback);
+      }
+    }
+
+    try
+    {
+      if (callback != null)
+      {
+        // a newly added callback
+        _zkClient.subscribeDataChanges(path, callback);
+        _zkClient.subscribeChildChanges(path, callback);
+
+        // do initial invocation
+        callback.handleChildChange(path, _zkClient.getChildren(path));
+
+        LOG.debug("Subscribed changes for " + path);
+      }
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in subscribeForPropertyChange(" + prefix + ")", e);
+      throw (new PropertyStoreException(e.toString()));
+    }
+  }
+
+  // prefix is always a normalized key
+  private void doUnsubscribeForPropertyChange(String prefix, ZkCallbackHandler<T> callback)
+  {
+    String path = getAbsolutePath(prefix);
+
+    _zkClient.unsubscribeDataChanges(path, callback);
+    _zkClient.unsubscribeChildChanges(path, callback);
+
+    List<String> childs = _zkClient.getChildren(path);
+    if (childs == null || childs.size() == 0)
+    {
+      return;
+    }
+
+    for (String child : childs)
+    {
+      doUnsubscribeForPropertyChange(prefix + "/" + child, callback);
+    }
+  }
+
+  @Override
+  public void unsubscribeForPropertyChange(String prefix,
+                                           PropertyChangeListener<T> listener) throws PropertyStoreException
+  {
+    if (listener == null)
+    {
+      throw new IllegalArgumentException("listener can't be null. Prefix: " + prefix);
+    }
+
+    String path = getAbsolutePath(prefix);
+    ZkCallbackHandler<T> callback = null;
+
+    synchronized (_callbackMap)
+    {
+      if (_callbackMap.containsKey(path))
+      {
+        Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
+            _callbackMap.get(path);
+        callback = callbacks.remove(listener);
+
+        if (callbacks == null || callbacks.isEmpty())
+        {
+          _callbackMap.remove(path);
+        }
+      }
+    }
+
+    if (callback != null)
+    {
+      doUnsubscribeForPropertyChange(prefix, callback);
+      LOG.debug("Unsubscribed changes for " + path);
+    }
+  }
+
+  @Override
+  public boolean canParentStoreData()
+  {
+    return false;
+  }
+
+  @Override
+  public void setPropertySerializer(final PropertySerializer<T> serializer)
+  {
+    if (serializer == null)
+    {
+      throw new IllegalArgumentException("serializer can't be null");
+    }
+
+    _serializer = serializer;
+  }
+
+  @Override
+  public void updatePropertyUntilSucceed(String key, DataUpdater<T> updater) throws PropertyStoreException
+  {
+    updatePropertyUntilSucceed(key, updater, true);
+  }
+
+  @Override
+  public void updatePropertyUntilSucceed(String key,
+                                         DataUpdater<T> updater,
+                                         boolean createIfAbsent) throws PropertyStoreException
+  {
+    String path = getAbsolutePath(key);
+    try
+    {
+      if (!_zkClient.exists(path))
+      {
+        if (!createIfAbsent)
+        {
+          throw new PropertyStoreException("Can't update " + key
+              + " since no node exists");
+        }
+        else
+        {
+          _zkClient.createPersistent(path, true);
+        }
+      }
+
+      _zkClient.updateDataSerialized(path, new ByteArrayUpdater(updater, _serializer));
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception in updatePropertyUntilSucceed(" + key + ", " + createIfAbsent
+          + ")", e);
+      throw (new PropertyStoreException(e.toString()));
+    }
+
+    // update cache
+    // getProperty(key);
+  }
+
+  @Override
+  public boolean compareAndSet(String key, T expected, T update, Comparator<T> comparator)
+  {
+    return compareAndSet(key, expected, update, comparator, true);
+  }
+
+  @Override
+  public boolean compareAndSet(String key,
+                               T expected,
+                               T update,
+                               Comparator<T> comparator,
+                               boolean createIfAbsent)
+  {
+    String path = getAbsolutePath(key);
+
+    // if two threads call with createIfAbsent=true
+    // one thread creates the node, the other just goes through
+    // when wirteData() one thread writes the other gets ZkBadVersionException
+    if (!_zkClient.exists(path))
+    {
+      if (createIfAbsent)
+      {
+        _zkClient.createPersistent(path, true);
+      }
+      else
+      {
+        return false;
+      }
+    }
+
+    try
+    {
+      Stat stat = new Stat();
+      byte[] currentBytes = _zkClient.readDataAndStat(path, stat, true);
+      T current = null;
+      if (currentBytes != null)
+      {
+        current = _serializer.deserialize(currentBytes);
+      }
+
+      if (comparator.compare(current, expected) == 0)
+      {
+        byte[] valueBytes = _serializer.serialize(update);
+        _zkClient.writeData(path, valueBytes, stat.getVersion());
+
+        // update cache
+        // getProperty(key);
+
+        return true;
+      }
+    }
+    catch (ZkBadVersionException e)
+    {
+      LOG.warn("Get BadVersion when writing to zookeeper. Mostly Ignorable due to contention");
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception when compareAndSet(" + key + ")", e);
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean exists(String key)
+  {
+    String path = getAbsolutePath(key);
+    return _zkClient.exists(path);
+  }
+
+  @Override
+  public void handleStateChanged(KeeperState state) throws Exception
+  {
+    LOG.info("KeeperState:" + state);
+    switch (state)
+    {
+    case SyncConnected:
+      _isConnected = true;
+      break;
+    case Disconnected:
+      _isConnected = false;
+      break;
+    case Expired:
+      _isConnected = false;
+      _hasSessionExpired = true;
+      break;
+    }
+  }
+
+  @Override
+  public void handleNewSession() throws Exception
+  {
+    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+    ZooKeeper zookeeper = connection.getZookeeper();
+    LOG.info("handleNewSession: " + zookeeper.getSessionId());
+
+    synchronized (_callbackMap)
+    {
+      for (String path : _callbackMap.keySet())
+      {
+        Map<PropertyChangeListener<T>, ZkCallbackHandler<T>> callbacks =
+            _callbackMap.get(path);
+        if (callbacks == null || callbacks.size() == 0)
+        {
+          LOG.error("Get a null callback map. Remove it. Path: " + path);
+          _callbackMap.remove(path);
+          continue;
+        }
+
+        for (PropertyChangeListener<T> listener : callbacks.keySet())
+        {
+          ZkCallbackHandler<T> callback = callbacks.get(listener);
+
+          if (callback == null)
+          {
+            LOG.error("Get a null callback. Remove it. Path: " + path + ", listener: "
+                + listener);
+            callbacks.remove(listener);
+            continue;
+          }
+          _zkClient.subscribeDataChanges(path, callback);
+          _zkClient.subscribeChildChanges(path, callback);
+
+          // do initial invocation
+          callback.handleChildChange(path, _zkClient.getChildren(path));
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean start()
+  {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public boolean stop()
+  {
+    _zkClient.close();
+    return true;
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) throws Exception
+  {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) throws Exception
+  {
+    // TODO Auto-generated method stub
+    String key = getRelativePath(dataPath);
+    synchronized (_cache)
+    {
+      _zkClient.unsubscribeDataChanges(dataPath, this);
+      _cache.remove(key);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
new file mode 100644
index 0000000..481b5d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZNode.java
@@ -0,0 +1,110 @@
+package org.apache.helix.store.zk;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.zookeeper.data.Stat;
+
+public class ZNode
+{
+  // used for a newly created item, because zkclient.create() doesn't return stat
+  // or used for places where we don't care about stat
+  public static final Stat ZERO_STAT = new Stat();
+  
+  final String _zkPath;
+  private Stat _stat;
+  Object _data;
+  Set<String> _childSet;
+
+  public ZNode(String zkPath, Object data, Stat stat)
+  {
+    _zkPath = zkPath;
+    _childSet = Collections.<String>emptySet(); // new HashSet<String>();
+    _data = data;
+    _stat = stat;
+  }
+
+  public void removeChild(String child)
+  {
+    if (_childSet != Collections.<String>emptySet())
+    {
+      _childSet.remove(child);
+    }
+  }
+  
+  public void addChild(String child)
+  {
+    if (_childSet == Collections.<String>emptySet())
+    {
+      _childSet = new HashSet<String>();
+    }
+    
+    _childSet.add(child);
+  }
+  
+  public void addChildren(List<String> children)
+  {
+    if (children != null && !children.isEmpty())
+    {
+      if (_childSet == Collections.<String>emptySet())
+      {
+        _childSet = new HashSet<String>();
+      }
+
+      _childSet.addAll(children);
+    }
+  }
+
+  public boolean hasChild(String child)
+  {
+    return _childSet.contains(child);
+  }
+
+  public Set<String> getChildSet()
+  {
+    return _childSet;
+  }
+  
+  public void setData(Object data)
+  {
+//    System.out.println("setData: " + _zkPath + ", data: " + data);
+    _data= data;    
+  }
+  
+  public Object getData()
+  {
+    return _data;
+  }
+  
+  public void setStat(Stat stat)
+  {
+    _stat = stat;
+  }
+  
+  public Stat getStat()
+  {
+    return _stat;
+  }
+  
+  public void setChildSet(List<String> childNames)
+  {
+    if (childNames != null && !childNames.isEmpty())
+    {
+      if (_childSet == Collections.<String>emptySet())
+      {
+        _childSet = new HashSet<String>();
+      }
+
+      _childSet.clear();
+      _childSet.addAll(childNames);
+    }
+  }
+  
+  @Override
+  public String toString()
+  {
+    return _zkPath + ", " + _data + ", " + _childSet + ", " + _stat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
new file mode 100644
index 0000000..30b1feb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkCallbackHandler.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.store.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.PropertyChangeListener;
+import org.apache.log4j.Logger;
+
+
+class ZkCallbackHandler<T> implements IZkChildListener, IZkDataListener
+{
+  private static Logger LOG = Logger.getLogger(ZkCallbackHandler.class);
+
+  private final ZkClient _zkClient;
+  private final ZKPropertyStore<T> _store;
+
+  // listen on prefix and all its childs
+  private final String _prefix;
+  private final PropertyChangeListener<T> _listener;
+
+  public ZkCallbackHandler(ZkClient client, ZKPropertyStore<T> store, String prefix,
+                           PropertyChangeListener<T> listener)
+  {
+    _zkClient = client;
+    _store = store;
+    _prefix = prefix;
+    _listener = listener;
+  }
+
+  @Override
+  public void handleDataChange(String path, Object data) throws Exception
+  {
+    LOG.debug("Data changed @ " + path + " to " + data);
+    String key = _store.getRelativePath(path);
+    _listener.onPropertyChange(key);
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) throws Exception
+  {
+    LOG.debug("Data deleted @ " + dataPath);
+  }
+
+  @Override
+  public void handleChildChange(String path, List<String> currentChilds) throws Exception
+  {
+    LOG.debug("childs changed @ " + path + " to " + currentChilds);
+    // System.out.println("childs changed @ " + path + " to " + currentChilds);
+
+
+    if (currentChilds == null)
+    {
+      /**
+       * When a node with a child change watcher is deleted
+       * a child change is triggered on the deleted node
+       * and in this case, the currentChilds is null
+       */
+      return;
+//    } else if (currentChilds.size() == 0)
+//    {
+//      String key = _store.getRelativePath(path);
+//      _listener.onPropertyChange(key);
+    }
+    else
+    {
+      String key = _store.getRelativePath(path);
+      _listener.onPropertyChange(key);
+
+      for (String child : currentChilds)
+      {
+        String childPath = path.endsWith("/") ? path + child : path + "/" + child;
+        _zkClient.subscribeDataChanges(childPath, this);
+        _zkClient.subscribeChildChanges(childPath, this);
+
+        // recursive call
+        handleChildChange(childPath, _zkClient.getChildren(childPath));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
new file mode 100644
index 0000000..56e636a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkHelixPropertyStore.java
@@ -0,0 +1,31 @@
+package org.apache.helix.store.zk;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+
+
+public class ZkHelixPropertyStore<T> extends ZkCacheBaseDataAccessor<T>
+{
+  public ZkHelixPropertyStore(ZkBaseDataAccessor<T> accessor,
+                              String root,
+                              List<String> subscribedPaths)
+  {
+    super(accessor, root, null, subscribedPaths);
+  }
+
+  public ZkHelixPropertyStore(String zkAddress,
+                              ZkSerializer serializer,
+                              String chrootPath,
+                              List<String> zkCachePaths)
+  {
+    super(zkAddress, serializer, chrootPath, null, zkCachePaths);
+  }
+
+  public ZkHelixPropertyStore(String zkAddress, ZkSerializer serializer, String chrootPath)
+  {
+    super(zkAddress, serializer, chrootPath, null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java b/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
new file mode 100644
index 0000000..3a561a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/ZkListener.java
@@ -0,0 +1,10 @@
+package org.apache.helix.store.zk;
+
+public interface ZkListener
+{
+  void handleDataChange(String path);
+  
+  void handleNodeCreate(String path);
+  
+  void handleNodeDelete(String path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java b/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
new file mode 100644
index 0000000..0675eac
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Helix implementation of zookeeper-based property store (Deprecated)
+ * 
+ */
+package org.apache.helix.store.zk;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
new file mode 100644
index 0000000..125c877
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/CLMLogFileAppender.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.spi.ErrorCode;
+
+public class CLMLogFileAppender extends FileAppender
+{
+  public CLMLogFileAppender()
+  {
+  }
+
+  public CLMLogFileAppender(Layout layout, String filename, boolean append,
+      boolean bufferedIO, int bufferSize) throws IOException
+  {
+    super(layout, filename, append, bufferedIO, bufferSize);
+  }
+
+  public CLMLogFileAppender(Layout layout, String filename, boolean append)
+      throws IOException
+  {
+    super(layout, filename, append);
+  }
+
+  public CLMLogFileAppender(Layout layout, String filename) throws IOException
+  {
+    super(layout, filename);
+  }
+
+  public void activateOptions()
+  {
+    if (fileName != null)
+    {
+      try
+      {
+        fileName = getNewLogFileName();
+        setFile(fileName, fileAppend, bufferedIO, bufferSize);
+      } catch (Exception e)
+      {
+        errorHandler.error("Error while activating log options", e,
+            ErrorCode.FILE_OPEN_FAILURE);
+      }
+    }
+  }
+
+  private String getNewLogFileName()
+  {
+    Calendar cal = Calendar.getInstance();
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
+    String time = sdf.format(cal.getTime());
+
+    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    StackTraceElement main = stack[stack.length - 1];
+    String mainClass = main.getClassName();
+
+    return System.getProperty("user.home") + "/EspressoLogs/" + mainClass + "_"
+        + time + ".txt";
+  }
+}


Mime
View raw message