cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Cassandra Wiki] Update of "Roger Mbiama" by Roger Mbiama
Date Thu, 29 Dec 2011 10:30:12 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.

The "Roger Mbiama" page has been changed by Roger Mbiama:
http://wiki.apache.org/cassandra/Roger%20Mbiama?action=diff&rev1=1&rev2=2

  Describe Roger Mbiama here.
  
- conf/README.txt
+ Roger Mbiama/README.txt
  Required configuration files
  ============================
  
@@ -33, +33 @@

  # See the License for the specific language governing permissions and
  # limitations under the License.
  
- calculate_heap_sizes()
- {
-     case "`uname`" in
-         Linux)
-             system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
-             system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo`
-             break
-         ;;
-         FreeBSD)
-             system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
-             system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
-             system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
-             break
-         ;;
-         SunOS)
-             system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'`
-             system_cpu_cores=`psrinfo | wc -l`
-             break
-         ;;
-         *)
-             # assume reasonable defaults for e.g. a modern desktop or
-             # cheap server
-             system_memory_in_mb="2048"
-             system_cpu_cores="2"
-         ;;
-     esac
-     max_heap_size_in_mb=`expr $system_memory_in_mb / 2`
-     MAX_HEAP_SIZE="${max_heap_size_in_mb}M"
- 
-     # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
-     max_sensible_yg_per_core_in_mb="100"
-     max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores`
- 
-     desired_yg_in_mb=`expr $max_heap_size_in_mb / 4`
- 
-     if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
-     then
-         HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
-     else
-         HEAP_NEWSIZE="${desired_yg_in_mb}M"
-     fi
- }
- 
- # Override these to set the amount of memory to allocate to the JVM at
- # start-up. For production use you almost certainly want to adjust
- # this for your environment. MAX_HEAP_SIZE is the total amount of
- # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size
- # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should
- # be either set or not (if you set one, set the other).
- #
- # The main trade-off for the young generation is that the larger it
- # is, the longer GC pause times will be. The shorter it is, the more
- # expensive GC will be (usually).
- #
- # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause
- # times. If in doubt, and if you do not particularly want to tweak, go with
- # 100 MB per physical CPU core.
- 
- #MAX_HEAP_SIZE="4G"
- #HEAP_NEWSIZE="800M"
- 
- if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
-     calculate_heap_sizes
- else
-     if [ "x$MAX_HEAP_SIZE" = "x" ] ||  [ "x$HEAP_NEWSIZE" = "x" ]; then
-         echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)"
-         exit 1
-     fi
- fi
- 
- # Specifies the default port over which Cassandra will be available for
- # JMX connections.
- JMX_PORT="7199"
- 
- 
- # Here we create the arguments that will get passed to the jvm when
- # starting cassandra.
- 
- # enable assertions.  disabling this in production will give a modest
- # performance benefit (around 5%).
- JVM_OPTS="$JVM_OPTS -ea"
- 
- # add the jamm javaagent
- check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'`
- if [ "$check_openjdk" != "OpenJDK" ]
- then
-     JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
- fi
- 
- # enable thread priorities, primarily so we can give periodic tasks
- # a lower priority to avoid interfering with client workload
- JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
- # allows lowering thread priority without being root.  see
- # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html
- JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42"
- 
- # min and max heap sizes should be set to the same value to avoid
- # stop-the-world GC pauses during resize, and so that we can lock the
- # heap in memory on startup to prevent any of it from being swapped
- # out.
- JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"
- JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}"
- JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}"
- JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" 
- 
- if [ "`uname`" = "Linux" ] ; then
-     # reduce the per-thread stack size to minimize the impact of Thrift
-     # thread-per-client.  (Best practice is for client connections to
-     # be pooled anyway.) Only do so on Linux where it is known to be
-     # supported.
-     JVM_OPTS="$JVM_OPTS -Xss128k"
- fi
- 
- # GC tuning options
- JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" 
- JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" 
- JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" 
- JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" 
- JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
- JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
- JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
- 
- # GC logging options -- uncomment to enable
- # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
- # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps"
- # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
- # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
- # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
- # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"
- 
- # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
- # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"
- 
- # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See 
- # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
- # comment out this entry to enable IPv6 support).
- JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true"
- 
- # jmx: metrics and administration interface
- # 
- # add this if you're having trouble connecting:
- # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>"
- # 
- # see 
- # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
- # for more on configuring JMX through firewalls, etc. (Short version:
- # get it working with no firewall first.)
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" 
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" 
- JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" 
- 
- 
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- /*
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-  
-  /**
-   * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is
to
-   * mass import large amounts of data, without using the Thrift interface.
-   *
-   * Inserting data through the binary memtable, allows you to skip the commit log overhead,
and an ack
-   * from Thrift on every insert. The example below utilizes Hadoop to generate all the data
necessary 
-   * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop
ends up doing is
-   * creating the actual data that gets put into an SSTable as if you were using Thrift.
With enough Hadoop nodes
-   * inserting the data, the bottleneck at this point should become the network.
-   * 
-   * We recommend adjusting the compaction threshold to 0, while the import is running. After
the import, you need
-   * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as
this will flush the remaining data still left 
-   * in memory to disk. Then it's recommended to adjust the compaction threshold to it's
original value.
-   * 
-   * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked
to work with normal Columns.
-   *
-   * You should construct your data you want to import as rows delimited by a new line. You
end up grouping by <Key>
-   * in the mapper, so that the end result generates the data set into a column oriented
subset. Once you get to the
-   * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to
your nodes.
-   *
-   * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes
for each row
-   * before proceeding to the next.  This means to keep Cassandra similarly busy you can
either
-   * 1) add more reducer tasks,
-   * 2) remove the "wait for acks" block of code,
-   * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
-   *
-   * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
-   */
-   
- package org.apache.cassandra.bulkloader;
- 
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.math.BigInteger;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.LinkedList;
- import java.util.List;
- 
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.clock.TimestampReconciler;
- import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.filter.QueryPath;
- import org.apache.cassandra.dht.BigIntegerToken;
- import org.apache.cassandra.io.util.DataOutputBuffer;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- 
- import org.apache.cassandra.net.IAsyncResult;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.utils.FBUtilities;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.*;
- 
- public class CassandraBulkLoader {
-     public static class Map extends MapReduceBase implements Mapper<Text, Text, Text,
Text> {
-         private Text word = new Text();
- 
-         public void map(Text key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
-             // This is a simple key/value mapper.
-             output.collect(key, value);
-         }
-     }
- 
-     public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text,
Text> {
-         private Path[] localFiles;
-         private ArrayList<String> tokens = new ArrayList<String>();
-         private JobConf jobconf;
- 
-         public void configure(JobConf job) {
-             this.jobconf = job;
-             String cassConfig;
- 
-             // Get the cached files
-             try
-             {
-                 localFiles = DistributedCache.getLocalCacheFiles(job);
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
-             }
-             cassConfig = localFiles[0].getParent().toString();
- 
-             System.setProperty("storage-config",cassConfig);
- 
-             try
-             {
-                 StorageService.instance.initClient();
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
-             }
-             try
-             {
-                 Thread.sleep(10*1000);
-             }
-             catch (InterruptedException e)
-             {
-                 throw new RuntimeException(e);
-             }
-         }
- 
-         public void close()
-         {
-             try
-             {
-                 // release the cache
-                 DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"),
this.jobconf);
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
-             }
-             catch (URISyntaxException e)
-             {
-                 throw new RuntimeException(e);
-             }
-             try
-             {
-                 // Sleep just in case the number of keys we send over is small
-                 Thread.sleep(3*1000);
-             }
-             catch (InterruptedException e)
-             {
-                 throw new RuntimeException(e);
-             }
-             StorageService.instance.stopClient();
-         }
- 
-         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text,
Text> output, Reporter reporter) throws IOException
-         {
-             ColumnFamily columnFamily;
-             String keyspace = "Keyspace1";
-             String cfName = "Super1";
-             Message message;
-             List<ColumnFamily> columnFamilies;
-             columnFamilies = new LinkedList<ColumnFamily>();
-             String line;
- 
-             /* Create a column family */
-             columnFamily = ColumnFamily.create(keyspace, cfName);
-             while (values.hasNext()) {
-                 // Split the value (line based on your own delimiter)
-                 line = values.next().toString();
-                 String[] fields = line.split("\1");
-                 String SuperColumnName = fields[1];
-                 String ColumnName = fields[2];
-                 String ColumnValue = fields[3];
-                 int timestamp = 0;
-                 columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"),
ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp));
-             }
- 
-             columnFamilies.add(columnFamily);
- 
-             /* Get serialized message to send to cluster */
-             message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies);
-             List<IAsyncResult> results = new ArrayList<IAsyncResult>();
-             for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace,
key.getBytes()))
-             {
-                 /* Send message to end point */
-                 results.add(MessagingService.instance.sendRR(message, endpoint));
-             }
-             /* wait for acks */
-             for (IAsyncResult result : results)
-             {
-                 try
-                 {
-                     result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                 }
-                 catch (TimeoutException e)
-                 {
-                     // you should probably add retry logic here
-                     throw new RuntimeException(e);
-                 }
-             }
-             
-             output.collect(key, new Text(" inserted into Cassandra node(s)"));
-         }
-     }
- 
-     public static void runJob(String[] args)
-     {
-         JobConf conf = new JobConf(CassandraBulkLoader.class);
- 
-         if(args.length >= 4)
-         {
-           conf.setNumReduceTasks(new Integer(args[3]));
-         }
- 
-         try
-         {
-             // We store the cassandra storage-conf.xml on the HDFS cluster
-             DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"),
conf);
-         }
-         catch (URISyntaxException e)
-         {
-             throw new RuntimeException(e);
-         }
-         conf.setInputFormat(KeyValueTextInputFormat.class);
-         conf.setJobName("CassandraBulkLoader_v2");
-         conf.setMapperClass(Map.class);
-         conf.setReducerClass(Reduce.class);
- 
-         conf.setOutputKeyClass(Text.class);
-         conf.setOutputValueClass(Text.class);
- 
-         FileInputFormat.setInputPaths(conf, new Path(args[1]));
-         FileOutputFormat.setOutputPath(conf, new Path(args[2]));
-         try
-         {
-             JobClient.runJob(conf);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
-         }
-     }
- 
-     public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily>
ColumnFamiles)
-     {
-         ColumnFamily baseColumnFamily;
-         DataOutputBuffer bufOut = new DataOutputBuffer();
-         RowMutation rm;
-         Message message;
-         Column column;
- 
-         /* Get the first column family from list, this is just to get past validation */
-         baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
-                                             ClockType.Timestamp,
-                                             DatabaseDescriptor.getComparator(Keyspace, CFName),
-                                             DatabaseDescriptor.getSubComparator(Keyspace,
CFName),
-                                             TimestampReconciler.instance,
-                                             CFMetaData.getId(Keyspace, CFName));
-         
-         for(ColumnFamily cf : ColumnFamiles) {
-             bufOut.reset();
-             ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
-             byte[] data = new byte[bufOut.getLength()];
-             System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());
- 
-             column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0));
-             baseColumnFamily.addColumn(column);
-         }
-         rm = new RowMutation(Keyspace, Key);
-         rm.add(baseColumnFamily);
- 
-         try
-         {
-             /* Make message */
-             message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
-         }
-         catch (IOException e)
-         {
-             throw new RuntimeException(e);
-         }
- 
-         return message;
-     }
-     public static void main(String[] args) throws Exception
-     {
-         runJob(args);
-     }
- }
- 
- #!/bin/bash
- #
- # /etc/init.d/cassandra
- #
- # Startup script for Cassandra
- # 
- # chkconfig: 2345 20 80
- # description: Starts and stops Cassandra
- 
- . /etc/rc.d/init.d/functions
- 
- export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/
- export CASSANDRA_HOME=/usr/share/cassandra/
- export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh
- export CASSANDRA_CONF=/etc/cassandra/conf
- export CASSANDRA_OWNR=cassandra
- log_file=/var/log/cassandra/cassandra.log
- pid_file=/var/run/cassandra/cassandra.pid
- CASSANDRA_PROG=/usr/sbin/cassandra
- 
- 
- case "$1" in
-     start)
-         # Cassandra startup
-         echo -n "Starting Cassandra: "
-         su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1
-         echo "OK"
-         ;;
-     stop)
-         # Cassandra shutdown
-         echo -n "Shutdown Cassandra: "
-         su $CASSANDRA_OWNR -c "kill `cat $pid_file`"
-         echo "OK"
-         ;;
-     reload|restart)
-         $0 stop
-         $0 start
-         ;;
-     status)
-         ;;
-     *)
-         echo "Usage: `basename $0` start|stop|restart|reload"
-         exit 1
- esac
- 
- exit 0
- 

Mime
View raw message