cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Cassandra Wiki] Update of "conf/README.txt" by DaveBrosius
Date Sat, 31 Dec 2011 19:55:22 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.

The "conf/README.txt" page has been changed by DaveBrosius:
http://wiki.apache.org/cassandra/conf/README.txt?action=diff&rev1=9&rev2=10

+ conf/README.txt
- ## page was renamed from Roger Mbiama
- Describe Roger Mbiama here.
- 
- Roger Mbiama/README.txt
  Required configuration files
  ============================
  
@@ -34, +31 @@

  # See the License for the specific language governing permissions and
  # limitations under the License.
  
+ calculate_heap_sizes()
+ {
+     case "`uname`" in
+         Linux)
+             system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
+             system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo`
+             break
+         ;;
+         FreeBSD)
+             system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
+             system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
+             system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
+             break
+         ;;
+         SunOS)
+             system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'`
+             system_cpu_cores=`psrinfo | wc -l`
+             break
+         ;;
+         *)
+             # assume reasonable defaults for e.g. a modern desktop or
+             # cheap server
+             system_memory_in_mb="2048"
+             system_cpu_cores="2"
+         ;;
+     esac
+     max_heap_size_in_mb=`expr $system_memory_in_mb / 2`
+     MAX_HEAP_SIZE="${max_heap_size_in_mb}M"
+ 
+     # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
+     max_sensible_yg_per_core_in_mb="100"
+     max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores`
+ 
+     desired_yg_in_mb=`expr $max_heap_size_in_mb / 4`
+ 
+     if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
+     then
+         HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
+     else
+         HEAP_NEWSIZE="${desired_yg_in_mb}M"
+     fi
+ }
+ 
+ # Override these to set the amount of memory to allocate to the JVM at
+ # start-up. For production use you almost certainly want to adjust
+ # this for your environment. MAX_HEAP_SIZE is the total amount of
+ # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size
+ # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should
+ # be either set or not (if you set one, set the other).
+ #
+ # The main trade-off for the young generation is that the larger it
+ # is, the longer GC pause times will be. The shorter it is, the more
+ # expensive GC will be (usually).
+ #
+ # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause
+ # times. If in doubt, and if you do not particularly want to tweak, go with
+ # 100 MB per physical CPU core.
+ 
+ #MAX_HEAP_SIZE="4G"
+ #HEAP_NEWSIZE="800M"
+ 
+ if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
+     calculate_heap_sizes
+ else
+     if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then
+         echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)"
+         exit 1
+     fi
+ fi
+ 
+ # Specifies the default port over which Cassandra will be available for
+ # JMX connections.
+ JMX_PORT="7199"
+ 
+ 
+ # Here we create the arguments that will get passed to the jvm when
+ # starting cassandra.
+ 
+ # enable assertions. disabling this in production will give a modest
+ # performance benefit (around 5%).
+ JVM_OPTS="$JVM_OPTS -ea"
+ 
+ # add the jamm javaagent
+ check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'`
+ if [ "$check_openjdk" != "OpenJDK" ]
+ then
+     JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
+ fi
+ 
+ # enable thread priorities, primarily so we can give periodic tasks
+ # a lower priority to avoid interfering with client workload
+ JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
+ # allows lowering thread priority without being root. see
+ # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html
+ JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42"
+ 
+ # min and max heap sizes should be set to the same value to avoid
+ # stop-the-world GC pauses during resize, and so that we can lock the
+ # heap in memory on startup to prevent any of it from being swapped
+ # out.
+ JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"
+ JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}"
+ JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}"
+ JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" 
+ 
+ if [ "`uname`" = "Linux" ] ; then
+     # reduce the per-thread stack size to minimize the impact of Thrift
+     # thread-per-client. (Best practice is for client connections to
+     # be pooled anyway.) Only do so on Linux where it is known to be
+     # supported.
+     JVM_OPTS="$JVM_OPTS -Xss128k"
+ fi
+ 
+ # GC tuning options
+ JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" 
+ JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" 
+ JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" 
+ JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" 
+ JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
+ JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
+ JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
+ 
+ # GC logging options -- uncomment to enable
+ # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
+ # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps"
+ # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
+ # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
+ # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
+ # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"
+ 
+ # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
+ # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"
+ 
+ # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See 
+ # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
+ # comment out this entry to enable IPv6 support).
+ JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true"
+ 
+ # jmx: metrics and administration interface
+ # 
+ # add this if you're having trouble connecting:
+ # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>"
+ # 
+ # see 
+ # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
+ # for more on configuring JMX through firewalls, etc. (Short version:
+ # get it working with no firewall first.)
+ JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" 
+ JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" 
+ JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" 
+ 
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+  
+  /**
+   * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is
to
+   * mass import large amounts of data, without using the Thrift interface.
+   *
+   * Inserting data through the binary memtable, allows you to skip the commit log overhead,
and an ack
+   * from Thrift on every insert. The example below utilizes Hadoop to generate all the data
necessary 
+   * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop
ends up doing is
+   * creating the actual data that gets put into an SSTable as if you were using Thrift.
With enough Hadoop nodes
+   * inserting the data, the bottleneck at this point should become the network.
+   * 
+   * We recommend adjusting the compaction threshold to 0, while the import is running. After
the import, you need
+   * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as
this will flush the remaining data still left 
+   * in memory to disk. Then it's recommended to adjust the compaction threshold to it's
original value.
+   * 
+   * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked
to work with normal Columns.
+   *
+   * You should construct your data you want to import as rows delimited by a new line. You
end up grouping by <Key>
+   * in the mapper, so that the end result generates the data set into a column oriented
subset. Once you get to the
+   * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to
your nodes.
+   *
+   * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes
for each row
+   * before proceeding to the next. This means to keep Cassandra similarly busy you can either
+   * 1) add more reducer tasks,
+   * 2) remove the "wait for acks" block of code,
+   * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
+   *
+   * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
+   */
+   
+ package org.apache.cassandra.bulkloader;
+ 
+ import java.io.IOException;
+ import java.io.UnsupportedEncodingException;
+ import java.math.BigInteger;
+ import java.net.URI;
+ import java.net.URISyntaxException;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.clock.TimestampReconciler;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.dht.BigIntegerToken;
+ import org.apache.cassandra.io.util.DataOutputBuffer;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
+ 
+ import org.apache.cassandra.net.IAsyncResult;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.hadoop.filecache.DistributedCache;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapred.*;
+ 
+ public class CassandraBulkLoader {
+     public static class Map extends MapReduceBase implements Mapper<Text, Text, Text,
Text> {
+         private Text word = new Text();
+ 
+         public void map(Text key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
+             // This is a simple key/value mapper.
+             output.collect(key, value);
+         }
+     }
+ 
+     public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text,
Text> {
+         private Path[] localFiles;
+         private ArrayList<String> tokens = new ArrayList<String>();
+         private JobConf jobconf;
+ 
+         public void configure(JobConf job) {
+             this.jobconf = job;
+             String cassConfig;
+ 
+             // Get the cached files
+             try
+             {
+                 localFiles = DistributedCache.getLocalCacheFiles(job);
+             }
+             catch (IOException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             cassConfig = localFiles[0].getParent().toString();
+ 
+             System.setProperty("storage-config",cassConfig);
+ 
+             try
+             {
+                 StorageService.instance.initClient();
+             }
+             catch (IOException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             try
+             {
+                 Thread.sleep(10*1000);
+             }
+             catch (InterruptedException e)
+             {
+                 throw new RuntimeException(e);
+             }
+         }
+ 
+         public void close()
+         {
+             try
+             {
+                 // release the cache
+                 DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"),
this.jobconf);
+             }
+             catch (IOException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             catch (URISyntaxException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             try
+             {
+                 // Sleep just in case the number of keys we send over is small
+                 Thread.sleep(3*1000);
+             }
+             catch (InterruptedException e)
+             {
+                 throw new RuntimeException(e);
+             }
+             StorageService.instance.stopClient();
+         }
+ 
+         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text,
Text> output, Reporter reporter) throws IOException
+         {
+             ColumnFamily columnFamily;
+             String keyspace = "Keyspace1";
+             String cfName = "Super1";
+             Message message;
+             List<ColumnFamily> columnFamilies;
+             columnFamilies = new LinkedList<ColumnFamily>();
+             String line;
+ 
+             /* Create a column family */
+             columnFamily = ColumnFamily.create(keyspace, cfName);
+             while (values.hasNext()) {
+                 // Split the value (line based on your own delimiter)
+                 line = values.next().toString();
+                 String[] fields = line.split("\1");
+                 String SuperColumnName = fields[1];
+                 String ColumnName = fields[2];
+                 String ColumnValue = fields[3];
+                 int timestamp = 0;
+                 columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"),
ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp));
+             }
+ 
+             columnFamilies.add(columnFamily);
+ 
+             /* Get serialized message to send to cluster */
+             message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies);
+             List<IAsyncResult> results = new ArrayList<IAsyncResult>();
+             for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace,
key.getBytes()))
+             {
+                 /* Send message to end point */
+                 results.add(MessagingService.instance.sendRR(message, endpoint));
+             }
+             /* wait for acks */
+             for (IAsyncResult result : results)
+             {
+                 try
+                 {
+                     result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                 }
+                 catch (TimeoutException e)
+                 {
+                     // you should probably add retry logic here
+                     throw new RuntimeException(e);
+                 }
+             }
+             
+             output.collect(key, new Text(" inserted into Cassandra node(s)"));
+         }
+     }
+ 
+     public static void runJob(String[] args)
+     {
+         JobConf conf = new JobConf(CassandraBulkLoader.class);
+ 
+         if(args.length >= 4)
+         {
+           conf.setNumReduceTasks(new Integer(args[3]));
+         }
+ 
+         try
+         {
+             // We store the cassandra storage-conf.xml on the HDFS cluster
+             DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"),
conf);
+         }
+         catch (URISyntaxException e)
+         {
+             throw new RuntimeException(e);
+         }
+         conf.setInputFormat(KeyValueTextInputFormat.class);
+         conf.setJobName("CassandraBulkLoader_v2");
+         conf.setMapperClass(Map.class);
+         conf.setReducerClass(Reduce.class);
+ 
+         conf.setOutputKeyClass(Text.class);
+         conf.setOutputValueClass(Text.class);
+ 
+         FileInputFormat.setInputPaths(conf, new Path(args[1]));
+         FileOutputFormat.setOutputPath(conf, new Path(args[2]));
+         try
+         {
+             JobClient.runJob(conf);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily>
ColumnFamiles)
+     {
+         ColumnFamily baseColumnFamily;
+         DataOutputBuffer bufOut = new DataOutputBuffer();
+         RowMutation rm;
+         Message message;
+         Column column;
+ 
+         /* Get the first column family from list, this is just to get past validation */
+         baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
+                                             ClockType.Timestamp,
+                                             DatabaseDescriptor.getComparator(Keyspace, CFName),
+                                             DatabaseDescriptor.getSubComparator(Keyspace,
CFName),
+                                             TimestampReconciler.instance,
+                                             CFMetaData.getId(Keyspace, CFName));
+         
+         for(ColumnFamily cf : ColumnFamiles) {
+             bufOut.reset();
+             ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
+             byte[] data = new byte[bufOut.getLength()];
+             System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());
+ 
+             column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0));
+             baseColumnFamily.addColumn(column);
+         }
+         rm = new RowMutation(Keyspace, Key);
+         rm.add(baseColumnFamily);
+ 
+         try
+         {
+             /* Make message */
+             message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+ 
+         return message;
+     }
+     public static void main(String[] args) throws Exception
+     {
+         runJob(args);
+     }
+ }
+ 
+ #!/bin/bash
+ #
+ # /etc/init.d/cassandra
+ #
+ # Startup script for Cassandra
+ # 
+ # chkconfig: 2345 20 80
+ # description: Starts and stops Cassandra
+ 
+ . /etc/rc.d/init.d/functions
+ 
+ export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/
+ export CASSANDRA_HOME=/usr/share/cassandra/
+ export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh
+ export CASSANDRA_CONF=/etc/cassandra/conf
+ export CASSANDRA_OWNR=cassandra
+ log_file=/var/log/cassandra/cassandra.log
+ pid_file=/var/run/cassandra/cassandra.pid
+ CASSANDRA_PROG=/usr/sbin/cassandra
+ 
+ 
+ case "$1" in
+     start)
+         # Cassandra startup
+         echo -n "Starting Cassandra: "
+         su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1
+         echo "OK"
+         ;;
+     stop)
+         # Cassandra shutdown
+         echo -n "Shutdown Cassandra: "
+         su $CASSANDRA_OWNR -c "kill `cat $pid_file`"
+         echo "OK"
+         ;;
+     reload|restart)
+         $0 stop
+         $0 start
+         ;;
+     status)
+         ;;
+     *)
+         echo "Usage: `basename $0` start|stop|restart|reload"
+         exit 1
+ esac
+ 
+ exit 0
+ 

Mime
View raw message