Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 34396 invoked from network); 2 Mar 2009 14:25:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 2 Mar 2009 14:25:58 -0000 Received: (qmail 36836 invoked by uid 500); 2 Mar 2009 14:25:55 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 36807 invoked by uid 500); 2 Mar 2009 14:25:55 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Delivered-To: moderator for cassandra-commits@incubator.apache.org Received: (qmail 36404 invoked by uid 99); 2 Mar 2009 07:59:03 -0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r749218 [18/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache... Date: Mon, 02 Mar 2009 07:57:31 -0000 To: cassandra-commits@incubator.apache.org From: pmalik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090302075744.8905E2388DBF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/HeartBeatState.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.concurrent.atomic.AtomicInteger; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.cassandra.io.ICompactSerializer; + + +/** + * HeartBeat State associated with any given endpoint. + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +class HeartBeatState +{ + private static ICompactSerializer serializer_; + + static + { + serializer_ = new HeartBeatStateSerializer(); + } + + int generation_; + AtomicInteger heartbeat_; + int version_; + + HeartBeatState() + { + } + + HeartBeatState(int generation, int heartbeat) + { + this(generation, heartbeat, 0); + } + + HeartBeatState(int generation, int heartbeat, int version) + { + generation_ = generation; + heartbeat_ = new AtomicInteger(heartbeat); + version_ = version; + } + + public static ICompactSerializer serializer() + { + return serializer_; + } + + int getGeneration() + { + return generation_; + } + + void updateGeneration() + { + ++generation_; + version_ = VersionGenerator.getNextVersion(); + } + + int getHeartBeat() + { + return heartbeat_.get(); + } + + void updateHeartBeat() + { + heartbeat_.incrementAndGet(); + version_ = VersionGenerator.getNextVersion(); + } + + int getHeartBeatVersion() + { + return version_; + } +}; + +class HeartBeatStateSerializer implements ICompactSerializer +{ + public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException + { + dos.writeInt(hbState.generation_); + dos.writeInt(hbState.heartbeat_.get()); + dos.writeInt(hbState.version_); + } + + public HeartBeatState deserialize(DataInputStream dis) throws IOException + { + return new HeartBeatState(dis.readInt(), dis.readInt(), dis.readInt()); + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +/** + * This is implemented by the Gossiper module to publish change events to interested parties. + * Interested parties register/unregister interest by invoking the methods of this interface. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public interface IEndPointStateChangePublisher +{ + /** + * Register for interesting state changes. + * @param subcriber module which implements the IEndPointStateChangeSubscriber + */ + public void register(IEndPointStateChangeSubscriber subcriber); + + /** + * Unregister interest for state changes. + * @param subcriber module which implements the IEndPointStateChangeSubscriber + */ + public void unregister(IEndPointStateChangeSubscriber subcriber); +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.EndPoint; + +/** + * This is called by an instance of the IEndPointStateChangePublisher to notify + * interested parties about changes in the the state associated with any endpoint. + * For instance if node A figures there is a changes in state for an endpoint B + * it notifies all interested parties of this change. It is upto to the registered + * instance to decide what he does with this change. Not all modules maybe interested + * in all state changes. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public interface IEndPointStateChangeSubscriber +{ + /** + * Use to inform interested parties about the change in the state + * for specified endpoint + * + * @param endpoint endpoint for which the state change occured. + * @param epState state that actually changed for the above endpoint. + */ + public void onChange(EndPoint endpoint, EndPointState epState); +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.EndPoint; + +/** + * Implemented by the Gossiper to either convict/suspect an endpoint + * based on the PHI calculated by the Failure Detector on the inter-arrival + * times of the heart beats. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public interface IFailureDetectionEventListener +{ + /** + * Convict the specified endpoint. + * @param ep endpoint to be convicted + */ + public void convict(EndPoint ep); + + /** + * Suspect the specified endpoint. + * @param ep endpoint to be suspected. + */ + public void suspect(EndPoint ep); +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureDetector.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.EndPoint; + +/** + * An interface that provides an application with the ability + * to query liveness information of a node in the cluster. It + * also exposes methods which help an application register callbacks + * for notifications of liveness information of nodes. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public interface IFailureDetector +{ + /** + * Failure Detector's knowledge of whether a node is up or + * down. + * + * @param ep endpoint in question. + * @return true if UP and false if DOWN. + */ + public boolean isAlive(EndPoint ep); + + /** + * This method is invoked by any entity wanting to interrogate the status of an endpoint. + * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and + * deem an endpoint as suspicious or alive as explained in the Hayashibara paper. + * + * param ep endpoint for which we interpret the inter arrival times. + */ + public void intepret(EndPoint ep); + + /** + * This method is invoked by the receiver of the heartbeat. In our case it would be + * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The + * FailureDetector will then sample the arrival time as explained in the paper. + * + * param ep endpoint being reported. + */ + public void report(EndPoint ep); + + /** + * Register interest for Failure Detector events. + * @param listener implementation of an application provided IFailureDetectionEventListener + */ + public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener); + + /** + * Un-register interest for Failure Detector events. + * @param listener implementation of an application provided IFailureDetectionEventListener + */ + public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener); +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/IFailureNotification.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.EndPoint; + +/** + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public interface IFailureNotification +{ + public void suspect(EndPoint ep); + public void revive(EndPoint ep); +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/JoinMessage.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.io.ICompactSerializer; + + +/** + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +class JoinMessage +{ + private static ICompactSerializer serializer_; + static + { + serializer_ = new JoinMessageSerializer(); + } + + static ICompactSerializer serializer() + { + return serializer_; + } + + String clusterId_; + + JoinMessage(String clusterId) + { + clusterId_ = clusterId; + } +} + +class JoinMessageSerializer implements ICompactSerializer +{ + public void serialize(JoinMessage joinMessage, DataOutputStream dos) throws IOException + { + dos.writeUTF(joinMessage.clusterId_); + } + + public JoinMessage deserialize(DataInputStream dis) throws IOException + { + String clusterId = dis.readUTF(); + return new JoinMessage(clusterId); + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/PureRandom.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.Random; + +import org.apache.cassandra.utils.BitSet; + + + +/** + * Implementation of a PureRandomNumber generator. Use this class cautiously. Not + * for general purpose use. Currently this is used by the Gossiper to choose a random + * endpoint to Gossip to. + * + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +class PureRandom extends Random +{ + private BitSet bs_ = new BitSet(); + private int lastUb_; + + PureRandom() + { + super(); + } + + public int nextInt(int ub) + { + if (ub <= 0) + throw new IllegalArgumentException("ub must be positive"); + + if ( lastUb_ != ub ) + { + bs_.clear(); + lastUb_ = ub; + } + else if(bs_.cardinality() == ub) + { + bs_.clear(); + } + + int value = super.nextInt(ub); + while ( bs_.get(value) ) + { + value = super.nextInt(ub); + } + bs_.set(value); + return value; + } + + public static void main(String[] args) throws Throwable + { + Random pr = new PureRandom(); + int ubs[] = new int[] { 2, 3, 1, 10, 5, 0}; + + for (int ub : ubs) + { + System.out.println("UB: " + String.valueOf(ub)); + for (int j = 0; j < 10; j++) + { + int junk = pr.nextInt(ub); + // Do something with junk so JVM doesn't optimize away + System.out.println(junk); + } + } + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/VersionGenerator.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A unique version number generator for any state that is generated by the + * local node. + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public class VersionGenerator +{ + private static AtomicInteger version_ = new AtomicInteger(0); + + public static int getNextVersion() + { + return version_.incrementAndGet(); + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/io/AIORandomAccessFile.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,789 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.cassandra.concurrent.ContinuationContext; +import org.apache.cassandra.concurrent.ContinuationsExecutor; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.IContinuable; +import org.apache.cassandra.concurrent.ThreadFactoryImpl; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.continuations.Suspendable; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.LogUtil; +import org.apache.commons.javaflow.Continuation; +import org.apache.log4j.Logger; + +/** + * A AIORandomAccessFile is like a + * RandomAccessFile, but it uses a private buffer so that most + * operations do not require a disk access. + *

+ * + * Note: The operations on this class are unmonitored. Also, the correct + * functioning of the RandomAccessFile methods that are not + * overridden here relies on the implementation of those methods in the + * superclass. + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public final class AIORandomAccessFile extends RandomAccessFile +{ + private final static Logger logger_ = Logger.getLogger(AIORandomAccessFile.class); + private final static ThreadLocal tls_ = new InheritableThreadLocal(); + static final int LogBuffSz_ = 16; // 64K buffer + public static final int BuffSz_ = (1 << LogBuffSz_); + static final long BuffMask_ = ~(((long) BuffSz_) - 1L); + /* Used to lock the creation of the disk thread pool instance */ + private static Lock createLock_ = new ReentrantLock(); + private static ExecutorService diskIOPool_; + + /** + * Submits a read request to the Kernel and is used + * only when running in Continuations mode. The kernel + * on read completion will resume the continuation passed + * in to complete the read request. + * + * @author alakshman + * + */ + class AIOReader implements IContinuable + { + /* the continuation that needs to be resumed on read completion */ + private ContinuationContext continuationCtx_; + + AIOReader(ContinuationContext continuationCtx) + { + continuationCtx_ = continuationCtx; + } + + public void run(Continuation c) + { + /* submit the read request */ + continuationCtx_.setContinuation(c); + ByteBuffer buffer = ByteBuffer.wrap( buffer_ ); + fileChannel_.read(buffer, diskPos_, continuationCtx_, new ReadCompletionHandler()); + } + } + + /** + * Read completion handler for AIO framework. The context + * that is passed in, is a Continuation that needs to be + * resumed on read completion. + * + * @author alakshman + * + * @param number of bytes read. + */ + class ReadCompletionHandler implements CompletionHandler + { + public void cancelled(ContinuationContext attachment) + { + } + + public void completed(Integer result, ContinuationContext attachment) + { + logger_.debug("Bytes read " + result); + if ( attachment != null ) + { + Continuation c = attachment.getContinuation(); + attachment.result(result); + if ( c != null ) + { + c = Continuation.continueWith(c, attachment); + ContinuationsExecutor.doPostProcessing(c); + } + } + } + + public void failed(Throwable th, ContinuationContext attachment) + { + } + } + + /* + * This implementation is based on the buffer implementation in Modula-3's + * "Rd", "Wr", "RdClass", and "WrClass" interfaces. + */ + private boolean dirty_; // true iff unflushed bytes exist + private boolean closed_; // true iff the file is closed + private long curr_; // current position in file + private long lo_, hi_; // bounds on characters in "buff" + private byte[] buffer_ = new byte[0]; // local buffer + private long maxHi_; // this.lo + this.buff.length + private boolean hitEOF_; // buffer contains last file block? + private long diskPos_; // disk position + private AsynchronousFileChannel fileChannel_; // asynchronous file channel used for AIO. + private boolean bContinuations_; // indicates if used in continuations mode. + + /* + * To describe the above fields, we introduce the following abstractions for + * the file "f": + * + * len(f) the length of the file curr(f) the current position in the file + * c(f) the abstract contents of the file disk(f) the contents of f's + * backing disk file closed(f) true iff the file is closed + * + * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a + * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if + * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush + * operation has the effect of making "disk(f)" identical to "c(f)". + * + * A file is said to be *valid* if the following conditions hold: + * + * V1. The "closed" and "curr" fields are correct: + * + * f.closed == closed(f) f.curr == curr(f) + * + * V2. The current position is either contained in the buffer, or just past + * the buffer: + * + * f.lo <= f.curr <= f.hi + * + * V3. Any (possibly) unflushed characters are stored in "f.buff": + * + * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo]) + * + * V4. For all characters not covered by V3, c(f) and disk(f) agree: + * + * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] == + * disk(f)[i]) + * + * V5. "f.dirty" is true iff the buffer contains bytes that should be + * flushed to the file; by V3 and V4, only part of the buffer can be dirty. + * + * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo]) + * + * V6. this.maxHi == this.lo + this.buff.length + * + * Note that "f.buff" can be "null" in a valid file, since the range of + * characters in V3 is empty when "f.lo == f.curr". + * + * A file is said to be *ready* if the buffer contains the current position, + * i.e., when: + * + * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi + * + * When a file is ready, reading or writing a single byte can be performed + * by reading or writing the in-memory buffer without performing a disk + * operation. + */ + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param file file to be opened. + */ + public AIORandomAccessFile(File file) throws IOException + { + super(file, "rw"); + this.init(file, 0, false); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param file file to be opened. + * @param bContinuations specify if continuations + * support is required. + */ + public AIORandomAccessFile(File file, boolean bContinuations) throws IOException + { + super(file, "rw"); + this.init(file, 0, bContinuations); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param file file to be opened + * @param size amount of data to be buffer as part + * of r/w operations + * @throws IOException + */ + public AIORandomAccessFile(File file, int size) throws IOException + { + super(file, "rw"); + init(file, size, false); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param file file to be opened + * @param size amount of data to be buffer as part + * of r/w operations + * @param bContinuations specify if continuations + * support is required. + * @throws IOException + */ + public AIORandomAccessFile(File file, int size, boolean bContinuations) throws IOException + { + super(file, "rw"); + init(file, size, bContinuations); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param name of file to be opened. + */ + public AIORandomAccessFile(String name) throws IOException + { + super(name, "rw"); + this.init(new File(name), 0, false); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param name of file to be opened. + * @param bContinuations specify if continuations + * support is required. + */ + public AIORandomAccessFile(String name, boolean bContinuations) throws IOException + { + super(name, "rw"); + this.init(new File(name), 0, bContinuations); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param name of file to be opened. + * @param size buffering size to be used. + */ + public AIORandomAccessFile(String name, int size) throws IOException + { + super(name, "rw"); + this.init(new File(name), size, false); + } + + /** + * Open a AIORandomAccessFile for r/w operations. + * @param name of file to be opened. + * @param name of file to be opened. + * @param bContinuations specify if continuations + * support is required. + */ + public AIORandomAccessFile(String name, int size, boolean bContinuations) throws IOException + { + super(name, "rw"); + this.init(new File(name), size, bContinuations); + } + + private void init(File file, int size, boolean bVal) throws IOException + { + bContinuations_ = bVal; + OpenOption[] openOptions = new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ}; + this.dirty_ = this.closed_ = false; + this.lo_ = this.curr_ = this.hi_ = 0; + this.buffer_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_]; + this.maxHi_ = (long) BuffSz_; + this.hitEOF_ = false; + this.diskPos_ = 0L; + /* set up the asynchronous file channel */ + if ( diskIOPool_ == null ) + { + createLock_.lock(); + try + { + if ( diskIOPool_ == null ) + { + int maxThreads = DatabaseDescriptor.getThreadsPerPool(); + diskIOPool_ = new ContinuationsExecutor( maxThreads, + maxThreads, + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new ThreadFactoryImpl("DISK-IO-POOL") + ); + } + } + finally + { + createLock_.unlock(); + } + } + Set set = new HashSet( Arrays.asList(openOptions) ); + fileChannel_ = AsynchronousFileChannel.open(file.toPath(), set, diskIOPool_); + } + + public void close() throws IOException + { + this.onClose(); + this.closed_ = true; + fileChannel_.close(); + } + + /** + * Flush any bytes in the file's buffer that have not yet been written to + * disk. If the file was created read-only, this method is a no-op. + */ + public void flush() throws IOException + { + this.flushBuffer(); + } + + /** + * Flush any dirty bytes in the buffer to disk. + */ + private void flushBuffer() throws IOException + { + if (this.dirty_) + { + int len = (int) (this.curr_ - this.lo_); + doWrite(this.lo_, false); + this.diskPos_ = this.curr_; + this.dirty_ = false; + } + } + + /** + * Invoked when close() is invoked and causes the flush + * of the last few bytes to block when the write is submitted. + * @throws IOException + */ + private void onClose() throws IOException + { + if (this.dirty_) + { + int len = (int) (this.curr_ - this.lo_); + doWrite(this.lo_, true); + this.diskPos_ = this.curr_; + this.dirty_ = false; + } + } + + /** + * This method submits an I/O for write where the write happens at + * position within the file. + * + * @param position to seek to within the file + * @param onClose indicates if this method was invoked on a close(). + */ + private void doWrite(long position, boolean onClose) + { + ByteBuffer buffer = ByteBuffer.wrap(buffer_); + int length = (int) (this.curr_ - this.lo_); + buffer.limit(length); + Future futurePtr = fileChannel_.write(buffer, position, null, new WriteCompletionHandler()); + if ( onClose ) + { + try + { + /* this will block but will execute only on a close() */ + futurePtr.get(); + } + catch (ExecutionException ex) + { + logger_.warn(LogUtil.throwableToString(ex)); + } + catch (InterruptedException ex) + { + logger_.warn(LogUtil.throwableToString(ex)); + } + } + buffer_ = new byte[buffer_.length]; + } + + /** + * Read at most "this.buff.length" bytes into "this.buff", returning the + * number of bytes read. If the return result is less than + * "this.buff.length", then EOF was read. + */ + private int fillBuffer() throws IOException + { + int cnt = 0; + ByteBuffer buffer = ByteBuffer.allocate(buffer_.length); + Future futurePtr = fileChannel_.read(buffer, this.diskPos_, null, new ReadCompletionHandler()); + + try + { + /* + * This should block + */ + cnt = futurePtr.get(); + } + catch (ExecutionException ex) + { + logger_.warn(LogUtil.throwableToString(ex)); + } + catch (InterruptedException ex) + { + logger_.warn(LogUtil.throwableToString(ex)); + } + + if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) + { + // make sure buffer that wasn't read is initialized with -1 + if ( cnt < 0 ) + cnt = 0; + Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff); + } + else + { + buffer_ = buffer.array(); + } + this.diskPos_ += cnt; + return cnt; + } + + /** + * Read as much data as indicated by the size of the buffer. + * This method is only invoked in continuation mode. + */ + private int fillBuffer2() + { + ContinuationContext continuationCtx = (ContinuationContext)Continuation.getContext(); + IContinuable reader = new AIOReader( continuationCtx ); + ContinuationsExecutor.putInTls(reader); + /* suspend the continuation */ + Continuation.suspend(); + + continuationCtx = (ContinuationContext)Continuation.getContext(); + int cnt = (Integer)continuationCtx.result(); + + if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) + { + // make sure buffer that wasn't read is initialized with -1 + if ( cnt < 0 ) + cnt = 0; + Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff); + } + this.diskPos_ += cnt; + return cnt; + } + + /** + * This method positions this.curr at position + * pos. If pos does not fall in the current + * buffer, it flushes the current buffer and loads the correct one. + *

+ * + * On exit from this routine this.curr == this.hi iff + * pos is at or past the end-of-file, which can only happen + * if the file was opened in read-only mode. + */ + public void seek(long pos) throws IOException + { + if (pos >= this.hi_ || pos < this.lo_) + { + // seeking outside of current buffer -- flush and read + this.flushBuffer(); + this.lo_ = pos & BuffMask_; // start at BuffSz boundary + this.maxHi_ = this.lo_ + (long) this.buffer_.length; + if (this.diskPos_ != this.lo_) + { + this.diskPos_ = this.lo_; + } + + int n = 0; + /* Perform the read operations in continuation style */ + if ( bContinuations_ ) + { + n = fillBuffer2(); + } + else + { + n = fillBuffer(); + } + this.hi_ = this.lo_ + (long) n; + } + else + { + // seeking inside current buffer -- no read required + if (pos < this.curr_) + { + // if seeking backwards, we must flush to maintain V4 + this.flushBuffer(); + } + } + this.curr_ = pos; + } + + + public long getFilePointer() + { + return this.curr_; + } + + public long length() throws IOException + { + return Math.max(this.curr_, super.length()); + } + + public int read() throws IOException + { + if (this.curr_ >= this.hi_) + { + // test for EOF + // if (this.hi < this.maxHi) return -1; + if (this.hitEOF_) + return -1; + + // slow path -- read another buffer + this.seek(this.curr_); + if (this.curr_ == this.hi_) + return -1; + } + byte res = this.buffer_[(int) (this.curr_ - this.lo_)]; + this.curr_++; + return ((int) res) & 0xFF; // convert byte -> int + } + + public int read(byte[] b) throws IOException + { + return this.read(b, 0, b.length); + } + + public int read(byte[] b, int off, int len) throws IOException + { + if (this.curr_ >= this.hi_) + { + // test for EOF + // if (this.hi < this.maxHi) return -1; + if (this.hitEOF_) + return -1; + + // slow path -- read another buffer + this.seek(this.curr_); + if (this.curr_ == this.hi_) + return -1; + } + len = Math.min(len, (int) (this.hi_ - this.curr_)); + int buffOff = (int) (this.curr_ - this.lo_); + System.arraycopy(this.buffer_, buffOff, b, off, len); + this.curr_ += len; + return len; + } + + public void write(int b) throws IOException + { + if (this.curr_ >= this.hi_) + { + if (this.hitEOF_ && this.hi_ < this.maxHi_) + { + // at EOF -- bump "hi" + this.hi_++; + } + else + { + // slow path -- write current buffer; read next one + this.seek(this.curr_); + if (this.curr_ == this.hi_) + { + // appending to EOF -- bump "hi" + this.hi_++; + } + } + } + this.buffer_[(int) (this.curr_ - this.lo_)] = (byte) b; + this.curr_++; + this.dirty_ = true; + } + + public void write(byte[] b) throws IOException + { + this.write(b, 0, b.length); + } + + public void write(byte[] b, int off, int len) throws IOException + { + while (len > 0) + { + int n = this.writeAtMost(b, off, len); + off += n; + len -= n; + } + this.dirty_ = true; + } + + /* + * Write at most "len" bytes to "b" starting at position "off", and return + * the number of bytes written. + */ + private int writeAtMost(byte[] b, int off, int len) throws IOException + { + if (this.curr_ >= this.hi_) + { + if (this.hitEOF_ && this.hi_ < this.maxHi_) + { + // at EOF -- bump "hi" + this.hi_ = this.maxHi_; + } + else + { + // slow path -- write current buffer; read next one + this.seek(this.curr_); + if (this.curr_ == this.hi_) + { + // appending to EOF -- bump "hi" + this.hi_ = this.maxHi_; + } + } + } + len = Math.min(len, (int) (this.hi_ - this.curr_)); + int buffOff = (int) (this.curr_ - this.lo_); + System.arraycopy(b, off, this.buffer_, buffOff, len); + this.curr_ += len; + return len; + } + + public static void main(String[] args) throws Throwable + { + /* + int i = 0; + try + { + RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), 64*1024); + aRaf2.seek(0L); + while ( i < 10000 ) + { + aRaf2.writeInt(32); + aRaf2.writeUTF("Avinash Lakshman"); + ++i; + } + aRaf2.close(); + } + catch( IOException ex ) + { + ex.printStackTrace(); + } + */ + /* + int j = 0; + try + { + RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat") ); + while ( j < 10 ) + { + System.out.println( aRaf2.readInt() ); + System.out.println( aRaf2.readUTF() ); + ++j; + } + aRaf2.close(); + } + catch( IOException ex ) + { + ex.printStackTrace(); + } + */ + + ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue() ); + es.execute(new ReadImpl()); + } +} + +class ReadImpl implements Runnable +{ + public void run() + { + int i = 0; + try + { + System.out.println("About to start the whole thing ..."); + AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), true ); + System.out.println("About to seek ..."); + + //aRaf2.seek(0L); + System.out.println( aRaf2.readInt() ); + System.out.println( aRaf2.readUTF() ); + + System.out.println("About to seek a second time ..."); + aRaf2.seek(66000L); + System.out.println( aRaf2.readInt() ); + System.out.println( aRaf2.readUTF() ); + + aRaf2.close(); + } + catch( IOException ex ) + { + ex.printStackTrace(); + } + } +} + +class WriteImpl implements Runnable +{ + public void run() + { + int i = 0; + try + { + AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat")); + while ( i < 10000 ) + { + aRaf2.writeInt(32); + aRaf2.writeUTF("Avinash Lakshman thinks John McCain is an idiot"); + ++i; + } + aRaf2.close(); + } + catch( IOException ex ) + { + ex.printStackTrace(); + } + } +} + +/** + * Write completion handler for AIO framework. The context + * that is passed in, is a Continuation that needs to be + * resumed on write completion. For now the continuation is + * not used at all. + * + * @author alakshman + * + * @param number of bytes written. + */ +class WriteCompletionHandler implements CompletionHandler +{ + private final static Logger logger_ = Logger.getLogger(WriteCompletionHandler.class); + + public void cancelled(Continuation attachment) + { + } + + public void completed(V result, Continuation attachment) + { + logger_.debug("Bytes written " + result); + while ( attachment != null ) + { + attachment = Continuation.continueWith(attachment); + } + } + + public void failed(Throwable th, Continuation attachment) + { + } +} + + Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.*; +import java.util.Arrays; + +import org.apache.log4j.Logger; + +/** + * A BufferedRandomAccessFile is like a + * RandomAccessFile, but it uses a private buffer so that most + * operations do not require a disk access. + *

+ * + * Note: The operations on this class are unmonitored. Also, the correct + * functioning of the RandomAccessFile methods that are not + * overridden here relies on the implementation of those methods in the + * superclass. + * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) + */ + +public final class BufferedRandomAccessFile extends RandomAccessFile +{ + private static final Logger logger_ = Logger.getLogger(BufferedRandomAccessFile.class); + static final int LogBuffSz_ = 16; // 64K buffer + public static final int BuffSz_ = (1 << LogBuffSz_); + static final long BuffMask_ = ~(((long) BuffSz_) - 1L); + + /* + * This implementation is based on the buffer implementation in Modula-3's + * "Rd", "Wr", "RdClass", and "WrClass" interfaces. + */ + private boolean dirty_; // true iff unflushed bytes exist + private boolean closed_; // true iff the file is closed + private long curr_; // current position in file + private long lo_, hi_; // bounds on characters in "buff" + private byte[] buff_; // local buffer + private long maxHi_; // this.lo + this.buff.length + private boolean hitEOF_; // buffer contains last file block? + private long diskPos_; // disk position + + /* + * To describe the above fields, we introduce the following abstractions for + * the file "f": + * + * len(f) the length of the file curr(f) the current position in the file + * c(f) the abstract contents of the file disk(f) the contents of f's + * backing disk file closed(f) true iff the file is closed + * + * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a + * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if + * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush + * operation has the effect of making "disk(f)" identical to "c(f)". + * + * A file is said to be *valid* if the following conditions hold: + * + * V1. The "closed" and "curr" fields are correct: + * + * f.closed == closed(f) f.curr == curr(f) + * + * V2. The current position is either contained in the buffer, or just past + * the buffer: + * + * f.lo <= f.curr <= f.hi + * + * V3. Any (possibly) unflushed characters are stored in "f.buff": + * + * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo]) + * + * V4. For all characters not covered by V3, c(f) and disk(f) agree: + * + * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] == + * disk(f)[i]) + * + * V5. "f.dirty" is true iff the buffer contains bytes that should be + * flushed to the file; by V3 and V4, only part of the buffer can be dirty. + * + * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo]) + * + * V6. this.maxHi == this.lo + this.buff.length + * + * Note that "f.buff" can be "null" in a valid file, since the range of + * characters in V3 is empty when "f.lo == f.curr". + * + * A file is said to be *ready* if the buffer contains the current position, + * i.e., when: + * + * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi + * + * When a file is ready, reading or writing a single byte can be performed + * by reading or writing the in-memory buffer without performing a disk + * operation. + */ + + /** + * Open a new BufferedRandomAccessFile on file + * in mode mode, which should be "r" for reading only, or + * "rw" for reading and writing. + */ + public BufferedRandomAccessFile(File file, String mode) throws IOException + { + super(file, mode); + this.init(0); + } + + public BufferedRandomAccessFile(File file, String mode, int size) throws IOException + { + super(file, mode); + this.init(size); + } + + /** + * Open a new BufferedRandomAccessFile on the file named + * name in mode mode, which should be "r" for + * reading only, or "rw" for reading and writing. + */ + public BufferedRandomAccessFile(String name, String mode) throws IOException + { + super(name, mode); + this.init(0); + } + + public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException + { + super(name, mode); + this.init(size); + } + + private void init(int size) + { + this.dirty_ = this.closed_ = false; + this.lo_ = this.curr_ = this.hi_ = 0; + this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_]; + this.maxHi_ = (long) BuffSz_; + this.hitEOF_ = false; + this.diskPos_ = 0L; + } + + public void close() throws IOException + { + this.flush(); + this.closed_ = true; + super.close(); + } + + /** + * Flush any bytes in the file's buffer that have not yet been written to + * disk. If the file was created read-only, this method is a no-op. + */ + public void flush() throws IOException + { + this.flushBuffer(); + } + + /* Flush any dirty bytes in the buffer to disk. */ + private void flushBuffer() throws IOException + { + if (this.dirty_) + { + if (this.diskPos_ != this.lo_) + super.seek(this.lo_); + int len = (int) (this.curr_ - this.lo_); + super.write(this.buff_, 0, len); + this.diskPos_ = this.curr_; + this.dirty_ = false; + } + } + + /* + * Read at most "this.buff.length" bytes into "this.buff", returning the + * number of bytes read. If the return result is less than + * "this.buff.length", then EOF was read. + */ + private int fillBuffer() throws IOException + { + int cnt = 0; + int rem = this.buff_.length; + while (rem > 0) + { + int n = super.read(this.buff_, cnt, rem); + if (n < 0) + break; + cnt += n; + rem -= n; + } + if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) ) + { + // make sure buffer that wasn't read is initialized with -1 + Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff); + } + this.diskPos_ += cnt; + return cnt; + } + + /* + * This method positions this.curr at position pos. + * If pos does not fall in the current buffer, it flushes the + * current buffer and loads the correct one.

+ * + * On exit from this routine this.curr == this.hi iff pos + * is at or past the end-of-file, which can only happen if the file was + * opened in read-only mode. + */ + public void seek(long pos) throws IOException + { + if (pos >= this.hi_ || pos < this.lo_) + { + // seeking outside of current buffer -- flush and read + this.flushBuffer(); + this.lo_ = pos & BuffMask_; // start at BuffSz boundary + this.maxHi_ = this.lo_ + (long) this.buff_.length; + if (this.diskPos_ != this.lo_) + { + super.seek(this.lo_); + this.diskPos_ = this.lo_; + } + int n = this.fillBuffer(); + this.hi_ = this.lo_ + (long) n; + } + else + { + // seeking inside current buffer -- no read required + if (pos < this.curr_) + { + // if seeking backwards, we must flush to maintain V4 + this.flushBuffer(); + } + } + this.curr_ = pos; + } + + public long getFilePointer() + { + return this.curr_; + } + + public long length() throws IOException + { + return Math.max(this.curr_, super.length()); + } + + public int read() throws IOException + { + if (this.curr_ >= this.hi_) + { + // test for EOF + // if (this.hi < this.maxHi) return -1; + if (this.hitEOF_) + return -1; + + // slow path -- read another buffer + this.seek(this.curr_); + if (this.curr_ == this.hi_) + return -1; + } + byte res = this.buff_[(int) (this.curr_ - this.lo_)]; + this.curr_++; + return ((int) res) & 0xFF; // convert byte -> int + } + + public int read(byte[] b) throws IOException + { + return this.read(b, 0, b.length); + } + + public int read(byte[] b, int off, int len) throws IOException + { + if (this.curr_ >= this.hi_) + { + // test for EOF + // if (this.hi < this.maxHi) return -1; + if (this.hitEOF_) + return -1; + + // slow path -- read another buffer + this.seek(this.curr_); + if (this.curr_ == this.hi_) + return -1; + } + len = Math.min(len, (int) (this.hi_ - this.curr_)); + int buffOff = (int) (this.curr_ - this.lo_); + System.arraycopy(this.buff_, buffOff, b, off, len); + this.curr_ += len; + return len; + } + + public void write(int b) throws IOException + { + if (this.curr_ >= this.hi_) + { + if (this.hitEOF_ && this.hi_ < this.maxHi_) + { + // at EOF -- bump "hi" + this.hi_++; + } + else + { + // slow path -- write current buffer; read next one + this.seek(this.curr_); + if (this.curr_ == this.hi_) + { + // appending to EOF -- bump "hi" + this.hi_++; + } + } + } + this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b; + this.curr_++; + this.dirty_ = true; + } + + public void write(byte[] b) throws IOException + { + this.write(b, 0, b.length); + } + + public void write(byte[] b, int off, int len) throws IOException + { + while (len > 0) + { + int n = this.writeAtMost(b, off, len); + off += n; + len -= n; + } + this.dirty_ = true; + } + + /* + * Write at most "len" bytes to "b" starting at position "off", and return + * the number of bytes written. + */ + private int writeAtMost(byte[] b, int off, int len) throws IOException + { + if (this.curr_ >= this.hi_) + { + if (this.hitEOF_ && this.hi_ < this.maxHi_) + { + // at EOF -- bump "hi" + this.hi_ = this.maxHi_; + } + else + { + // slow path -- write current buffer; read next one + this.seek(this.curr_); + if (this.curr_ == this.hi_) + { + // appending to EOF -- bump "hi" + this.hi_ = this.maxHi_; + } + } + } + len = Math.min(len, (int) (this.hi_ - this.curr_)); + int buffOff = (int) (this.curr_ - this.lo_); + System.arraycopy(b, off, this.buff_, buffOff, len); + this.curr_ += len; + return len; + } +} Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java?rev=749218&view=auto ============================================================================== --- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java (added) +++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumManager.java Mon Mar 2 07:57:22 2009 @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Method; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.Adler32; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FileUtils; +import org.apache.cassandra.utils.LogUtil; +import org.apache.log4j.Logger; +import bak.pcj.map.AbstractLongKeyLongMap; +import bak.pcj.map.LongKeyLongChainedHashMap; + +/** + * This class manages the persistence of checksums and keeps + * them in memory. It maintains a mapping of data files on + * disk to their corresponding checksum files. It is also + * loads the checksums in memory on start up. + * + * @author alakshman + * + */ +class ChecksumManager +{ + private static Logger logger_ = Logger.getLogger(ChecksumManager.class); + /* Keeps a mapping of checksum manager instances to data file */ + private static Map chksumMgrs_ = new HashMap(); + private static Lock lock_ = new ReentrantLock(); + private static final String checksumPrefix_ = "Checksum-"; + private static final int bufferSize_ = 8*1024*1024; + private static final long chunkMask_ = 0x00000000FFFFFFFFL; + private static final long fileIdMask_ = 0x7FFFFFFF00000000L; + /* Map where checksums are cached. */ + private static AbstractLongKeyLongMap chksums_ = new LongKeyLongChainedHashMap(); + + public static ChecksumManager instance(String dataFile) throws IOException + { + ChecksumManager chksumMgr = chksumMgrs_.get(dataFile); + if ( chksumMgr == null ) + { + lock_.lock(); + try + { + if ( chksumMgr == null ) + { + chksumMgr = new ChecksumManager(dataFile); + chksumMgrs_.put(dataFile, chksumMgr); + } + } + finally + { + lock_.unlock(); + } + } + return chksumMgr; + } + + /* TODO: Debug only */ + public static ChecksumManager instance(String dataFile, String chkSumFile) throws IOException + { + ChecksumManager chksumMgr = chksumMgrs_.get(dataFile); + if ( chksumMgr == null ) + { + lock_.lock(); + try + { + if ( chksumMgr == null ) + { + chksumMgr = new ChecksumManager(dataFile, chkSumFile); + chksumMgrs_.put(dataFile, chksumMgr); + } + } + finally + { + lock_.unlock(); + } + } + return chksumMgr; + } + + + /** + * On start read all the check sum files on disk and + * pull them into memory. + * @throws IOException + */ + public static void onStart() throws IOException + { + String[] directories = DatabaseDescriptor.getAllDataFileLocations(); + List allFiles = new ArrayList(); + for ( String directory : directories ) + { + File file = new File(directory); + File[] files = file.listFiles(); + for ( File f : files ) + { + if ( f.getName().contains(ChecksumManager.checksumPrefix_) ) + { + allFiles.add(f); + } + } + } + + for ( File file : allFiles ) + { + int fId = SequenceFile.getFileId(file.getName()); + ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length()); + + int chunk = 0; + while ( !chksumRdr.isEOF() ) + { + long value = chksumRdr.readLong(); + long key = ChecksumManager.key(fId, ++chunk); + chksums_.put(key, value); + } + } + } + + /** + * On delete of this dataFile remove the checksums associated with + * this file from memory, remove the check sum manager instance. + * + * @param dataFile data file that is being deleted. + * @throws IOException + */ + public static void onFileDelete(String dataFile) throws IOException + { + File f = new File(dataFile); + long size = f.length(); + int fileId = SequenceFile.getFileId(f.getName()); + int chunks = (int)(size >> 16L); + + for ( int i = 0; i < chunks; ++i ) + { + long key = ChecksumManager.key(fileId, i); + chksums_.remove(key); + } + + /* remove the check sum manager instance */ + chksumMgrs_.remove(dataFile); + String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db"; + FileUtils.delete(chksumFile); + } + + private static long key(int fileId, int chunkId) + { + long key = 0; + key |= fileId; + key <<= 32; + key |= chunkId; + return key; + } + + private RandomAccessFile raf_; + private Adler32 adler_ = new Adler32(); + + ChecksumManager(String dataFile) throws IOException + { + File file = new File(dataFile); + String directory = file.getParent(); + String f = file.getName(); + short fId = SequenceFile.getFileId(f); + String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db"; + raf_ = new RandomAccessFile(chkSumFile, "rw"); + } + + /* TODO: Remove later. */ + ChecksumManager(String dataFile, String chkSumFile) throws IOException + { + File file = new File(dataFile); + String directory = file.getParent(); + String f = file.getName(); + short fId = SequenceFile.getFileId(f); + raf_ = new RandomAccessFile(chkSumFile, "rw"); + + file = new File(chkSumFile); + ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length()); + + int chunk = 0; + while ( !chksumRdr.isEOF() ) + { + long value = chksumRdr.readLong(); + long key = ChecksumManager.key(fId, ++chunk); + chksums_.put(key, value); + } + } + + /** + * Log the checksum for the the specified file and chunk + * within the file. + * @param fileId id associated with the file + * @param chunkId chunk within the file. + * @param buffer for which the checksum needs to be calculated. + * @throws IOException + */ + void logChecksum(int fileId, int chunkId, byte[] buffer) + { + logChecksum(fileId, chunkId, buffer, 0, buffer.length); + } + + /** + * Log the checksum for the the specified file and chunk + * within the file. + * @param fileId id associated with the file + * @param chunkId chunk within the file. + * @param buffer for which the checksum needs to be calculated. + * @param startoffset offset to start within the buffer + * @param length size of the checksum buffer. + * @throws IOException + */ + void logChecksum(int fileId, int chunkId, byte[] buffer, int startOffset, int length) + { + try + { + adler_.update(buffer, startOffset, length); + long chksum = adler_.getValue(); + adler_.reset(); + /* log checksums to disk */ + raf_.writeLong(chksum); + /* add the chksum to memory */ + long key = ChecksumManager.key(fileId, chunkId); + chksums_.put(key, chksum); + } + catch ( IOException ex ) + { + logger_.warn( LogUtil.throwableToString(ex) ); + } + } + + /** + * Validate checksums for the data in the buffer. + * @file name of the file from which data is being + * read. + * @chunkId chunkId + * @param buffer with data for which checksum needs to be + * verified. + * @throws IOException + */ + void validateChecksum(String file, int chunkId, byte[] buffer) throws IOException + { + validateChecksum(file, chunkId, buffer, 0, buffer.length); + } + + /** + * Validate checksums for the data in the buffer for the region + * that is encapsulated in the section object + * @file name of the file from which data is being + * read. + * @chunkId chunkId + * @param buffer with data for which checksum needs to be + * verified. + * @param startOffset within the buffer + * @param length of the data whose checksum needs to be verified. + * @throws IOException + */ + void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException + { + int fId = SequenceFile.getFileId(file); + long key = ChecksumManager.key(fId, chunkId); + adler_.update(buffer, startOffset, length); + long currentChksum = adler_.getValue(); + adler_.reset(); + long oldChksum = chksums_.get(key); + if ( currentChksum != oldChksum ) + { + throw new IOException("Checksums do not match in file " + file + " for chunk " + chunkId + "."); + } + } + + + /** + * Get the checksum for the specified file's chunk + * @param fileId id associated with the file. + * @param chunkId chunk within the file. + * @return associated checksum for the chunk + */ + long getChecksum(int fileId, int chunkId) + { + long key = ChecksumManager.key(fileId, chunkId); + return chksums_.get(key); + } + + public static void main(String[] args) throws Throwable + { + ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db"); + while ( !rdr.isEOF() ) + { + System.out.println(rdr.readLong()); + } + rdr.close(); + } +} + +/** + * ChecksumReader is used to memory map the checksum files and + * load the data into memory. + * + * @author alakshman + * + */ +class ChecksumReader +{ + private static Logger logger_ = Logger.getLogger(ChecksumReader.class); + private String filename_; + private MappedByteBuffer buffer_; + + ChecksumReader(String filename) throws IOException + { + filename_ = filename; + File f = new File(filename); + map(0, f.length()); + } + + ChecksumReader(String filename, long start, long end) throws IOException + { + filename_ = filename; + map(start, end); + } + + public void map() throws IOException + { + RandomAccessFile file = new RandomAccessFile(filename_, "rw"); + try + { + buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length() ); + buffer_.load(); + } + finally + { + file.close(); + } + } + + public void map(long start, long end) throws IOException + { + if ( start < 0 || end < 0 || end < start ) + throw new IllegalArgumentException("Invalid values for start and end."); + + RandomAccessFile file = new RandomAccessFile(filename_, "rw"); + try + { + if ( end == 0 ) + end = file.length(); + buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, start, end); + buffer_.load(); + } + finally + { + file.close(); + } + } + + void unmap(final Object buffer) + { + AccessController.doPrivileged( new PrivilegedAction() + { + public MappedByteBuffer run() + { + try + { + Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]); + getCleanerMethod.setAccessible(true); + sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]); + cleaner.clean(); + } + catch(Throwable e) + { + logger_.debug( LogUtil.throwableToString(e) ); + } + return null; + } + }); + } + + public long readLong() throws IOException + { + return buffer_.getLong(); + } + + public boolean isEOF() + { + return ( buffer_.remaining() == 0 ); + } + + + public void close() throws IOException + { + unmap(buffer_); + } +}