kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [01/13] Rename client package from kafka.* to org.apache.kafka.*
Date Fri, 07 Feb 2014 00:26:32 GMT
Updated Branches:
  refs/heads/trunk 3220af1fe -> fa6339c19


http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
new file mode 100644
index 0000000..64d172d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -0,0 +1,144 @@
+package org.apache.kafka.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.common.utils.SystemTime;
+
+
+public class Microbenchmarks {
+
+    public static void main(String[] args) throws Exception {
+
+        final int iters = Integer.parseInt(args[0]);
+        double x = 0.0;
+        long start = System.nanoTime();
+        for (int i = 0; i < iters; i++)
+            x += Math.sqrt(x);
+        System.out.println(x);
+        System.out.println("sqrt: " + (System.nanoTime() - start) / (double) iters);
+
+        // test clocks
+        systemMillis(iters);
+        systemNanos(iters);
+        long total = 0;
+        start = System.nanoTime();
+        total += systemMillis(iters);
+        System.out.println("System.currentTimeMillis(): " + (System.nanoTime() - start) /
iters);
+        start = System.nanoTime();
+        total += systemNanos(iters);
+        System.out.println("System.nanoTime(): " + (System.nanoTime() - start) / iters);
+        System.out.println(total);
+
+        // test random
+        int n = 0;
+        Random random = new Random();
+        start = System.nanoTime();
+        for (int i = 0; i < iters; i++) {
+            n += random.nextInt();
+        }
+        System.out.println(n);
+        System.out.println("random: " + (System.nanoTime() - start) / iters);
+
+        float[] floats = new float[1024];
+        for (int i = 0; i < floats.length; i++)
+            floats[i] = random.nextFloat();
+        Arrays.sort(floats);
+
+        int loc = 0;
+        start = System.nanoTime();
+        for (int i = 0; i < iters; i++)
+            loc += Arrays.binarySearch(floats, floats[i % floats.length]);
+        System.out.println(loc);
+        System.out.println("binary search: " + (System.nanoTime() - start) / iters);
+
+        final SystemTime time = new SystemTime();
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final Object lock = new Object();
+        Thread t1 = new Thread() {
+            public void run() {
+                time.sleep(1);
+                int counter = 0;
+                long start = time.nanoseconds();
+                for (int i = 0; i < iters; i++) {
+                    synchronized (lock) {
+                        counter++;
+                    }
+                }
+                System.out.println("synchronized: " + ((System.nanoTime() - start) / iters));
+                System.out.println(counter);
+                done.set(true);
+            }
+        };
+
+        Thread t2 = new Thread() {
+            public void run() {
+                int counter = 0;
+                while (!done.get()) {
+                    time.sleep(1);
+                    synchronized (lock) {
+                        counter += 1;
+                    }
+                }
+                System.out.println("Counter: " + counter);
+            }
+        };
+
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+
+        Map<String, Integer> values = new HashMap<String, Integer>();
+        for (int i = 0; i < 100; i++)
+            values.put(Integer.toString(i), i);
+        System.out.println("HashMap:");
+        benchMap(2, 1000000, values);
+        System.out.println("ConcurentHashMap:");
+        benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values));
+        System.out.println("CopyOnWriteMap:");
+        benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values));
+    }
+
+    private static void benchMap(int numThreads, final int iters, final Map<String, Integer>
map) throws Exception {
+        final List<String> keys = new ArrayList<String>(map.keySet());
+        final List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < numThreads; i++) {
+            threads.add(new Thread() {
+                public void run() {
+                    int sum = 0;
+                    long start = System.nanoTime();
+                    for (int j = 0; j < iters; j++)
+                        map.get(keys.get(j % threads.size()));
+                    System.out.println("Map access time: " + ((System.nanoTime() - start)
/ (double) iters));
+                }
+            });
+        }
+        for (Thread thread : threads)
+            thread.start();
+        for (Thread thread : threads)
+            thread.join();
+    }
+
+    private static long systemMillis(int iters) {
+        long total = 0;
+        for (int i = 0; i < iters; i++)
+            total += System.currentTimeMillis();
+        return total;
+    }
+
+    private static long systemNanos(int iters) {
+        long total = 0;
+        for (int i = 0; i < iters; i++)
+            total += System.currentTimeMillis();
+        return total;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
new file mode 100644
index 0000000..0c69c5f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -0,0 +1,88 @@
+package org.apache.kafka.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Time;
+
+
+/**
+ * A fake selector to use for testing
+ */
+public class MockSelector implements Selectable {
+
+    private final Time time;
+    private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>();
+    private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
+    private final List<Integer> disconnected = new ArrayList<Integer>();
+    private final List<Integer> connected = new ArrayList<Integer>();
+
+    public MockSelector(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
throws IOException {
+        this.connected.add(id);
+    }
+
+    @Override
+    public void disconnect(int id) {
+        this.disconnected.add(id);
+    }
+
+    @Override
+    public void wakeup() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public void clear() {
+        this.completedSends.clear();
+        this.completedReceives.clear();
+        this.disconnected.clear();
+        this.connected.clear();
+    }
+
+    @Override
+    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+        this.completedSends.addAll(sends);
+        time.sleep(timeout);
+    }
+
+    @Override
+    public List<NetworkSend> completedSends() {
+        return completedSends;
+    }
+
+    public void completeSend(NetworkSend send) {
+        this.completedSends.add(send);
+    }
+
+    @Override
+    public List<NetworkReceive> completedReceives() {
+        return completedReceives;
+    }
+
+    public void completeReceive(NetworkReceive receive) {
+        this.completedReceives.add(receive);
+    }
+
+    @Override
+    public List<Integer> disconnected() {
+        return disconnected;
+    }
+
+    @Override
+    public List<Integer> connected() {
+        return connected;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
new file mode 100644
index 0000000..541bc59
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -0,0 +1,96 @@
+package org.apache.kafka.test;
+
+import static java.util.Arrays.asList;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+
+
+/**
+ * Helper functions for writing unit tests
+ */
+public class TestUtils {
+
+    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+
+    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+    public static String DIGITS = "0123456789";
+    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+
+    /* A consistent random number generator to make tests repeatable */
+    public static final Random seededRandom = new Random(192348092834L);
+    public static final Random random = new Random();
+
+    public static Cluster singletonCluster(String topic, int partitions) {
+        return clusterWith(1, topic, partitions);
+    }
+
+    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+        Node[] ns = new Node[nodes];
+        for (int i = 0; i < nodes; i++)
+            ns[i] = new Node(0, "localhost", 1969);
+        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+        for (int i = 0; i < partitions; i++)
+            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        return new Cluster(asList(ns), parts);
+    }
+
+    /**
+     * Choose a number of random available ports
+     */
+    public static int[] choosePorts(int count) {
+        try {
+            ServerSocket[] sockets = new ServerSocket[count];
+            int[] ports = new int[count];
+            for (int i = 0; i < count; i++) {
+                sockets[i] = new ServerSocket(0);
+                ports[i] = sockets[i].getLocalPort();
+            }
+            for (int i = 0; i < count; i++)
+                sockets[i].close();
+            return ports;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Choose an available port
+     */
+    public static int choosePort() {
+        return choosePorts(1)[0];
+    }
+
+    /**
+     * Generate an array of random bytes
+     * 
+     * @param numBytes The size of the array
+     */
+    public static byte[] randomBytes(int size) {
+        byte[] bytes = new byte[size];
+        seededRandom.nextBytes(bytes);
+        return bytes;
+    }
+
+    /**
+     * Generate a random string of letters and digits of the given length
+     * 
+     * @param len The length of the string
+     * @return The random string
+     */
+    public static String randomString(int len) {
+        StringBuilder b = new StringBuilder();
+        for (int i = 0; i < len; i++)
+            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+        return b.toString();
+    }
+
+}


Mime
View raw message