incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Adding HDFS trace store
Date Thu, 05 Dec 2013 14:31:00 GMT
Updated Branches:
  refs/heads/apache-blur-0.2 6d1ef8bb1 -> 12ef492a4


Adding HDFS trace store


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7072122c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7072122c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7072122c

Branch: refs/heads/apache-blur-0.2
Commit: 7072122c18a9648479104750a8687efec6aab004
Parents: 6d1ef8b
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 5 09:24:37 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 5 09:24:37 2013 -0500

----------------------------------------------------------------------
 .../org/apache/blur/thrift/ThriftServer.java    |  16 +-
 .../blur/trace/hdfs/HdfsTraceStorage.java       | 184 +++++++++++++++++++
 .../blur/trace/hdfs/HdfsTraceStorageTest.java   | 127 +++++++++++++
 .../org/apache/blur/utils/BlurConstants.java    |   1 +
 .../src/main/resources/blur-default.properties  |   5 +-
 5 files changed, 329 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7072122c/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index 48c0b27..caf8ab8 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -22,6 +22,7 @@ import static org.apache.blur.metrics.MetricsConstants.JVM;
 import static org.apache.blur.metrics.MetricsConstants.LOAD_AVERAGE;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 import static org.apache.blur.metrics.MetricsConstants.SYSTEM;
+import static org.apache.blur.utils.BlurConstants.BLUR_HDFS_TRACE_PATH;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TRACE_PATH;
 
 import java.io.BufferedReader;
@@ -57,6 +58,7 @@ import org.apache.blur.thrift.server.TThreadedSelectorServer.Args.AcceptPolicy;
 import org.apache.blur.trace.LogTraceStorage;
 import org.apache.blur.trace.TraceStorage;
 import org.apache.blur.trace.ZooKeeperTraceStorage;
+import org.apache.blur.trace.hdfs.HdfsTraceStorage;
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
@@ -99,11 +101,19 @@ public class ThriftServer {
   }
 
   public static TraceStorage setupTraceStorage(BlurConfiguration configuration) throws IOException
{
-    String path = configuration.get(BLUR_ZOOKEEPER_TRACE_PATH);
-    if (path == null || path.isEmpty()) {
+    String zKpath = configuration.get(BLUR_ZOOKEEPER_TRACE_PATH);
+    String hdfsPath = configuration.get(BLUR_HDFS_TRACE_PATH);
+    if (zKpath != null && hdfsPath != null) {
+      throw new RuntimeException("Cannot have both [" + BLUR_ZOOKEEPER_TRACE_PATH + "] and
[" + BLUR_HDFS_TRACE_PATH
+          + "] set.");
+    }
+    if (zKpath != null) {
+      return new ZooKeeperTraceStorage(configuration);
+    } else if (hdfsPath != null) {
+      return new HdfsTraceStorage(configuration);
+    } else {
       return new LogTraceStorage(configuration);
     }
-    return new ZooKeeperTraceStorage(configuration);
   }
 
   public static void printUlimits() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7072122c/blur-core/src/main/java/org/apache/blur/trace/hdfs/HdfsTraceStorage.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/trace/hdfs/HdfsTraceStorage.java b/blur-core/src/main/java/org/apache/blur/trace/hdfs/HdfsTraceStorage.java
new file mode 100644
index 0000000..5cd9bf2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/trace/hdfs/HdfsTraceStorage.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.trace.hdfs;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_HDFS_TRACE_PATH;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.trace.Trace.TraceId;
+import org.apache.blur.trace.TraceCollector;
+import org.apache.blur.trace.TraceStorage;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HdfsTraceStorage extends TraceStorage {
+
+  private static final String UTF_8 = "UTF-8";
+
+  private final static Log LOG = LogFactory.getLog(HdfsTraceStorage.class);
+
+  private final Path _storePath;
+  private final BlockingQueue<TraceCollector> _queue = new LinkedBlockingQueue<TraceCollector>();
+  private final Thread _daemon;
+  private final Configuration _conf = new Configuration();
+  private final FileSystem _fileSystem;
+
+  public HdfsTraceStorage(BlurConfiguration configuration) throws IOException {
+    super(configuration);
+    _storePath = new Path(configuration.get(BLUR_HDFS_TRACE_PATH));
+    _fileSystem = _storePath.getFileSystem(_conf);
+    _fileSystem.mkdirs(_storePath);
+    _daemon = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        Random random = new Random();
+        while (true) {
+          TraceCollector collector;
+          try {
+            collector = _queue.take();
+          } catch (InterruptedException e) {
+            return;
+          }
+          try {
+            writeCollector(collector, random);
+          } catch (Throwable t) {
+            LOG.error("Unknown error while trying to write collector.", t);
+          }
+        }
+      }
+    });
+    _daemon.setDaemon(true);
+    _daemon.setName("ZooKeeper Trace Queue Writer");
+    _daemon.start();
+  }
+
+  @Override
+  public void store(TraceCollector collector) {
+    try {
+      _queue.put(collector);
+    } catch (InterruptedException e) {
+      LOG.error("Unknown error while trying to add collector on queue", e);
+    }
+  }
+
+  private void writeCollector(TraceCollector collector, Random random) throws IOException
{
+    TraceId id = collector.getId();
+    String storeId = id.getRootId();
+    String requestId = id.getRequestId();
+    if (requestId == null) {
+      requestId = "";
+    }
+    Path tracePath = getTracePath(storeId);
+    String json = collector.toJson();
+    storeJson(new Path(tracePath, getRequestIdPathName(requestId, random)), json);
+  }
+
+  private String getRequestIdPathName(String requestId, Random random) {
+    return requestId + "_" + random.nextLong();
+  }
+
+  public void storeJson(Path storePath, String json) throws IOException {
+    FSDataOutputStream outputStream = _fileSystem.create(storePath, false);
+    outputStream.write(json.getBytes(UTF_8));
+    outputStream.close();
+  }
+
+  private Path getTracePath(String traceId) {
+    return new Path(_storePath, traceId);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _fileSystem.close();
+  }
+
+  @Override
+  public List<String> getTraceIds() throws IOException {
+    FileStatus[] listStatus = _fileSystem.listStatus(_storePath);
+    List<String> traceIds = new ArrayList<String>();
+    for (FileStatus status : listStatus) {
+      traceIds.add(status.getPath().getName());
+    }
+    return traceIds;
+  }
+
+  @Override
+  public List<String> getRequestIds(String traceId) throws IOException {
+    List<String> requestIds = new ArrayList<String>();
+    Path tracePath = getTracePath(traceId);
+    FileStatus[] listStatus = _fileSystem.listStatus(tracePath);
+    for (FileStatus status : listStatus) {
+      String name = status.getPath().getName();
+      int indexOf = name.lastIndexOf('_');
+      if (indexOf > 0) {
+        requestIds.add(name.substring(0, indexOf));
+      }
+    }
+    return requestIds;
+  }
+
+  @Override
+  public String getRequestContentsJson(String traceId, String requestId) throws IOException
{
+    Path path = findPath(traceId, requestId);
+    FSDataInputStream inputStream = _fileSystem.open(path);
+    try {
+      return IOUtils.toString(inputStream);
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private Path findPath(String traceId, String requestId) throws IOException {
+    Path tracePath = getTracePath(traceId);
+    if (!_fileSystem.exists(tracePath)) {
+      throw new IOException("Trace [" + traceId + "] not found.");
+    }
+    FileStatus[] listStatus = _fileSystem.listStatus(tracePath);
+    for (FileStatus status : listStatus) {
+      Path path = status.getPath();
+      String name = path.getName();
+      int indexOf = name.lastIndexOf('_');
+      if (indexOf > 0) {
+        if (name.substring(0, indexOf).equals(requestId)) {
+          return path;
+        }
+      }
+    }
+    throw new IOException("Request [" + requestId + "] not found.");
+  }
+
+  @Override
+  public void removeTrace(String traceId) throws IOException {
+    Path storePath = getTracePath(traceId);
+    _fileSystem.delete(storePath, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7072122c/blur-core/src/test/java/org/apache/blur/trace/hdfs/HdfsTraceStorageTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/trace/hdfs/HdfsTraceStorageTest.java
b/blur-core/src/test/java/org/apache/blur/trace/hdfs/HdfsTraceStorageTest.java
new file mode 100644
index 0000000..b357f5e
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/trace/hdfs/HdfsTraceStorageTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.trace.hdfs;
+
+import static org.apache.blur.utils.BlurConstants.BLUR_HDFS_TRACE_PATH;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HdfsTraceStorageTest {
+
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp"));
+
+  private HdfsTraceStorage _storage;
+  private BlurConfiguration configuration;
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    rmr(TMPDIR);
+    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
+    File testDirectory = new File(TMPDIR, "HdfsTraceStorageTest").getAbsoluteFile();
+    testDirectory.mkdirs();
+
+    Path directory = new Path(testDirectory.getPath());
+    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
+    FsAction userAction = dirPermissions.getUserAction();
+    FsAction groupAction = dirPermissions.getGroupAction();
+    FsAction otherAction = dirPermissions.getOtherAction();
+
+    StringBuilder builder = new StringBuilder();
+    builder.append(userAction.ordinal());
+    builder.append(groupAction.ordinal());
+    builder.append(otherAction.ordinal());
+    String dirPermissionNum = builder.toString();
+    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
+
+    configuration = new BlurConfiguration();
+    configuration.set(BLUR_HDFS_TRACE_PATH, directory.makeQualified(localFS).toString());
+    _storage = new HdfsTraceStorage(configuration);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    _storage.close();
+    rmr(new File("./target/HdfsTraceStorageTest"));
+  }
+
+  private void rmr(File file) {
+    if (file.exists()) {
+      if (file.isDirectory()) {
+        for (File f : file.listFiles()) {
+          rmr(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testStorage() throws IOException {
+    Random random = new Random();
+    createTraceData(random);
+    createTraceData(random);
+    createTraceData(random);
+    List<String> traceIds = _storage.getTraceIds();
+    assertEquals(3, traceIds.size());
+
+    for (String traceId : traceIds) {
+      List<String> requestIds = _storage.getRequestIds(traceId);
+      assertEquals(4, requestIds.size());
+      for (String requestId : requestIds) {
+        String contents = _storage.getRequestContentsJson(traceId, requestId);
+        assertEquals("{" + requestId + "}", contents);
+      }
+    }
+
+    _storage.removeTrace(traceIds.get(0));
+    assertEquals(2, _storage.getTraceIds().size());
+  }
+
+  private void createTraceData(Random random) throws IOException {
+    String traceId = Long.toString(Math.abs(random.nextLong()));
+    Path path = new Path(configuration.get(BLUR_HDFS_TRACE_PATH));
+    Path tracePath = new Path(path, traceId);
+    Path storePath = new Path(tracePath, traceId + "_" + Long.toString(Math.abs(random.nextLong())));
+    _storage.storeJson(storePath, "{" + traceId + "}");
+    writeRequest(random, tracePath);
+    writeRequest(random, tracePath);
+    writeRequest(random, tracePath);
+  }
+
+  private void writeRequest(Random random, Path tracePath) throws IOException {
+    String requestId = Long.toString(random.nextLong());
+    Path storePath = new Path(tracePath, requestId + "_" + Long.toString(Math.abs(random.nextLong())));
+    System.out.println(storePath);
+    _storage.storeJson(storePath, "{" + requestId + "}");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7072122c/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 094815a..8aa6f07 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -39,6 +39,7 @@ public class BlurConstants {
   public static final String BLUR_TABLE_PATH = "blur.table.path";
   public static final String BLUR_ZOOKEEPER_CONNECTION = "blur.zookeeper.connection";
   public static final String BLUR_ZOOKEEPER_TRACE_PATH = "blur.zookeeper.trace.path";
+  public static final String BLUR_HDFS_TRACE_PATH = "blur.hdfs.trace.path";
   public static final String BLUR_ZOOKEEPER_TIMEOUT = "blur.zookeeper.timeout";
   public static final int BLUR_ZOOKEEPER_TIMEOUT_DEFAULT = 30000;
   public static final String BLUR_SHARD_HOSTNAME = "blur.shard.hostname";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7072122c/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 213c931..75b487a 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -16,9 +16,12 @@
 # The zookeeper session timeout
 blur.zookeeper.timeout=90000
 
-# The path in ZooKeeper where the distributed traces will be stored, if blank trace output
will be written to the log
+# The path in ZooKeeper where the distributed traces will be stored, if blank trace output
will be written to the log or the HDFS store.
 blur.zookeeper.trace.path=/blur/traces
 
+# The path in HDFS where the distributed traces will be stored, if blank trace output will
be written to the log or the ZooKeeper store.
+blur.hdfs.trace.path=
+
 # The maximum number of results that can be fetched in a single request
 blur.query.max.results.fetch=1000
 


Mime
View raw message