incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/6] git commit: Making some updates to the merge scheduler.
Date Tue, 17 Mar 2015 13:14:33 GMT
Making some updates to the merge scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7444b2c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7444b2c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7444b2c8

Branch: refs/heads/master
Commit: 7444b2c871dd3d58e5cae8ab16305300413a9169
Parents: 5ea2217
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Mar 17 09:11:52 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Mar 17 09:11:52 2015 -0400

----------------------------------------------------------------------
 .../manager/writer/SharedMergeScheduler.java    |  11 +-
 .../blur/store/hdfs/HdfsStreamIndexInput.java   | 168 -------------------
 .../blur/store/hdfs/IndexInputMergeUtil.java    |  36 ----
 3 files changed, 8 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7444b2c8/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
index 34d602e..bc97dbc 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SharedMergeScheduler.java
@@ -92,7 +92,7 @@ public class SharedMergeScheduler implements Closeable {
       _writer.merge(_merge);
       long e = System.nanoTime();
       double time = (e - s) / 1000000000.0;
-      double rate = (_size / 1000 / 1000) / time;
+      double rate = (_size / 1000.0 / 1000.0) / time;
       LOG.info("Merge took [{0} s] to complete at rate of [{1} MB/s], input bytes [{2}],
segments merged {3}", time,
           rate, _size, _merge.segments);
       _throughputBytes.mark(_size);
@@ -168,8 +168,8 @@ public class SharedMergeScheduler implements Closeable {
             MergeWork mergeWork = queue.take();
             try {
               mergeWork.merge();
-            } catch (IOException e) {
-              LOG.error("Unknown error while trying to perform merge on [{0}]", e, mergeWork);
+            } catch (Throwable t) {
+              LOG.error("Unknown error while trying to perform merge on [{0}]", t, mergeWork);
             }
           } catch (InterruptedException e) {
             if (_running.get()) {
@@ -196,6 +196,11 @@ public class SharedMergeScheduler implements Closeable {
       public void close() throws IOException {
         remove(_id);
       }
+
+      @Override
+      public MergeScheduler clone() {
+        return getMergeScheduler();
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7444b2c8/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
deleted file mode 100644
index df83c32..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.IndexInput;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-
-public class HdfsStreamIndexInput extends IndexInput {
-
-  private final long _length;
-  private final FSDataInputStream _inputStream;
-  private final MetricsGroup _metricsGroup;
-  private final Histogram _readStreamAccess;
-  private final Meter _readStreamThroughput;
-  private final Meter _readStreamSeek;
-
-  private long _postion;
-
-  public HdfsStreamIndexInput(FSDataInputStream inputStream, long length, MetricsGroup metricsGroup,
Path path)
-      throws IOException {
-    super("HdfsStreamIndexInput(" + path.toString() + ")");
-    _inputStream = inputStream;
-    _length = length;
-    _metricsGroup = metricsGroup;
-    _postion = _inputStream.getPos();
-    _readStreamSeek = _metricsGroup.readStreamSeek;
-    _readStreamAccess = _metricsGroup.readStreamAccess;
-    _readStreamThroughput = _metricsGroup.readStreamThroughput;
-  }
-
-  private void checkPosition() throws IOException {
-    long pos = _inputStream.getPos();
-    if (pos != _postion) {
-      _inputStream.seek(_postion);
-      _readStreamSeek.mark();
-    }
-  }
-
-  @Override
-  public IndexInput clone() {
-    if (IndexInputMergeUtil.isMergeThread()) {
-      return super.clone();
-    }
-    throw new RuntimeException("who is doing this?");
-  }
-
-  @Override
-  public void close() throws IOException {
-
-  }
-
-  @Override
-  public long getFilePointer() {
-    return _postion;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    _postion = pos;
-  }
-
-  @Override
-  public long length() {
-    return _length;
-  }
-
-  @Override
-  public byte readByte() throws IOException {
-    long start = System.nanoTime();
-    synchronized (_inputStream) {
-      checkPosition();
-      try {
-        return _inputStream.readByte();
-      } finally {
-        _postion++;
-        addMetric(start, 1);
-      }
-    }
-  }
-
-  private void addMetric(long start, int length) {
-    long end = System.nanoTime();
-    _readStreamAccess.update((end - start) / 1000);
-    _readStreamThroughput.mark(length);
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len) throws IOException {
-    long start = System.nanoTime();
-    synchronized (_inputStream) {
-      checkPosition();
-      try {
-        _inputStream.read(b, offset, len);
-      } finally {
-        _postion += len;
-        addMetric(start, len);
-      }
-    }
-  }
-
-  @Override
-  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException
{
-    readBytes(b, offset, len);
-  }
-
-  @Override
-  public short readShort() throws IOException {
-    long start = System.nanoTime();
-    synchronized (_inputStream) {
-      checkPosition();
-      try {
-        return _inputStream.readShort();
-      } finally {
-        _postion += 2;
-        addMetric(start, 2);
-      }
-    }
-  }
-
-  @Override
-  public int readInt() throws IOException {
-    long start = System.nanoTime();
-    synchronized (_inputStream) {
-      checkPosition();
-      try {
-        return _inputStream.readInt();
-      } finally {
-        _postion += 4;
-        addMetric(start, 4);
-      }
-    }
-  }
-
-  @Override
-  public long readLong() throws IOException {
-    long start = System.nanoTime();
-    synchronized (_inputStream) {
-      checkPosition();
-      try {
-        return _inputStream.readLong();
-      } finally {
-        _postion += 8;
-        addMetric(start, 8);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7444b2c8/blur-store/src/main/java/org/apache/blur/store/hdfs/IndexInputMergeUtil.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/IndexInputMergeUtil.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/IndexInputMergeUtil.java
deleted file mode 100644
index dce111b..0000000
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/IndexInputMergeUtil.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.store.hdfs;
-
-import org.apache.blur.utils.BlurConstants;
-
-public class IndexInputMergeUtil {
-
-  private static boolean streamSwitch = true;
-
-  public static boolean isMergeThread() {
-    if (!streamSwitch) {
-      return false;
-    }
-    String name = Thread.currentThread().getName();
-    if (name.startsWith(BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX)) {
-      return true;
-    }
-    return false;
-  }
-
-}


Mime
View raw message