cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [1/3] cassandra git commit: Support counter-columns for native aggregates (sum, avg, max, min)
Date Thu, 24 Dec 2015 13:06:50 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk c9ef25fd8 -> 53e370f62


Support counter-columns for native aggregates (sum,avg,max,min)

patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-9977


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8287ebcb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8287ebcb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8287ebcb

Branch: refs/heads/trunk
Commit: 8287ebcb6ad46529ca90600dc0c2f98ecab89cf0
Parents: ee36f14
Author: Robert Stupp <snazy@snazy.de>
Authored: Thu Dec 24 14:05:16 2015 +0100
Committer: Robert Stupp <snazy@snazy.de>
Committed: Thu Dec 24 14:05:16 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/cql3/functions/AggregateFcts.java | 141 +++++++++++--------
 .../cassandra/cql3/functions/Functions.java     |   2 +
 .../cql3/validation/entities/UFTest.java        |  26 ++++
 .../validation/operations/AggregationTest.java  |  42 ++++++
 5 files changed, 156 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb0b151..c0fd4f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Support counter-columns for native aggregates (sum,avg,max,min) (CASSANDRA-9977)
  * (cqlsh) show correct column names for empty result sets (CASSANDRA-9813)
  * Add new types to Stress (CASSANDRA-9556)
  * Add property to allow listening on broadcast interface (CASSANDRA-9748)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
index 41e43c0..77be525 100644
--- a/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
+++ b/src/java/org/apache/cassandra/cql3/functions/AggregateFcts.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
 import org.apache.cassandra.db.marshal.DecimalType;
 import org.apache.cassandra.db.marshal.DoubleType;
 import org.apache.cassandra.db.marshal.FloatType;
@@ -480,31 +481,7 @@ public abstract class AggregateFcts
             {
                 public Aggregate newAggregate()
                 {
-                    return new Aggregate()
-                    {
-                        private long sum;
-
-                        public void reset()
-                        {
-                            sum = 0;
-                        }
-
-                        public ByteBuffer compute(int protocolVersion)
-                        {
-                            return ((LongType) returnType()).decompose(sum);
-                        }
-
-                        public void addInput(int protocolVersion, List<ByteBuffer>
values)
-                        {
-                            ByteBuffer value = values.get(0);
-
-                            if (value == null)
-                                return;
-
-                            Number number = ((Number) argTypes().get(0).compose(value));
-                            sum += number.longValue();
-                        }
-                    };
+                    return new LongSumAggregate();
                 }
             };
 
@@ -516,37 +493,7 @@ public abstract class AggregateFcts
             {
                 public Aggregate newAggregate()
                 {
-                    return new Aggregate()
-                    {
-                        private long sum;
-
-                        private int count;
-
-                        public void reset()
-                        {
-                            count = 0;
-                            sum = 0;
-                        }
-
-                        public ByteBuffer compute(int protocolVersion)
-                        {
-                            long avg = count == 0 ? 0 : sum / count;
-
-                            return ((LongType) returnType()).decompose(avg);
-                        }
-
-                        public void addInput(int protocolVersion, List<ByteBuffer>
values)
-                        {
-                            ByteBuffer value = values.get(0);
-
-                            if (value == null)
-                                return;
-
-                            count++;
-                            Number number = ((Number) argTypes().get(0).compose(value));
-                            sum += number.longValue();
-                        }
-                    };
+                    return new LongAvgAggregate();
                 }
             };
 
@@ -707,6 +654,30 @@ public abstract class AggregateFcts
             };
 
     /**
+     * The SUM function for counter column values.
+     */
+    public static final AggregateFunction sumFunctionForCounter =
+            new NativeAggregateFunction("sum", CounterColumnType.instance, CounterColumnType.instance)
+            {
+                public Aggregate newAggregate()
+                {
+                    return new LongSumAggregate();
+                }
+            };
+
+    /**
+     * AVG function for counter column values.
+     */
+    public static final AggregateFunction avgFunctionForCounter =
+            new NativeAggregateFunction("avg", CounterColumnType.instance, CounterColumnType.instance)
+            {
+                public Aggregate newAggregate()
+                {
+                    return new LongAvgAggregate();
+                }
+            };
+
+    /**
      * Creates a MAX function for the specified type.
      *
      * @param inputType the function input and output type
@@ -827,4 +798,62 @@ public abstract class AggregateFcts
             }
         };
     }
+
+    private static class LongSumAggregate implements AggregateFunction.Aggregate
+    {
+        private long sum;
+
+        public void reset()
+        {
+            sum = 0;
+        }
+
+        public ByteBuffer compute(int protocolVersion)
+        {
+            return LongType.instance.decompose(sum);
+        }
+
+        public void addInput(int protocolVersion, List<ByteBuffer> values)
+        {
+            ByteBuffer value = values.get(0);
+
+            if (value == null)
+                return;
+
+            Number number = LongType.instance.compose(value);
+            sum += number.longValue();
+        }
+    }
+
+    private static class LongAvgAggregate implements AggregateFunction.Aggregate
+    {
+        private long sum;
+
+        private int count;
+
+        public void reset()
+        {
+            count = 0;
+            sum = 0;
+        }
+
+        public ByteBuffer compute(int protocolVersion)
+        {
+            long avg = count == 0 ? 0 : sum / count;
+
+            return LongType.instance.decompose(avg);
+        }
+
+        public void addInput(int protocolVersion, List<ByteBuffer> values)
+        {
+            ByteBuffer value = values.get(0);
+
+            if (value == null)
+                return;
+
+            count++;
+            Number number = LongType.instance.compose(value);
+            sum += number.longValue();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/src/java/org/apache/cassandra/cql3/functions/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java
index e31fc9f..0f1af19 100644
--- a/src/java/org/apache/cassandra/cql3/functions/Functions.java
+++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java
@@ -91,6 +91,7 @@ public abstract class Functions
         declare(AggregateFcts.sumFunctionForDouble);
         declare(AggregateFcts.sumFunctionForDecimal);
         declare(AggregateFcts.sumFunctionForVarint);
+        declare(AggregateFcts.sumFunctionForCounter);
         declare(AggregateFcts.avgFunctionForByte);
         declare(AggregateFcts.avgFunctionForShort);
         declare(AggregateFcts.avgFunctionForInt32);
@@ -99,6 +100,7 @@ public abstract class Functions
         declare(AggregateFcts.avgFunctionForDouble);
         declare(AggregateFcts.avgFunctionForVarint);
         declare(AggregateFcts.avgFunctionForDecimal);
+        declare(AggregateFcts.avgFunctionForCounter);
 
         MigrationManager.instance.register(new FunctionsMigrationListener());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
index 0d11a82..bcfe871 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java
@@ -716,6 +716,32 @@ public class UFTest extends CQLTester
     }
 
     @Test
+    public void testJavaFunctionCounter() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key int primary key, val counter)");
+
+        String fName = createFunction(KEYSPACE, "counter",
+                                      "CREATE OR REPLACE FUNCTION %s(val counter) " +
+                                      "CALLED ON NULL INPUT " +
+                                      "RETURNS bigint " +
+                                      "LANGUAGE JAVA " +
+                                      "AS 'return val + 1;';");
+
+        execute("UPDATE %s SET val = val + 1 WHERE key = 1");
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+                   row(1, 1L, 2L));
+        execute("UPDATE %s SET val = val + 1 WHERE key = 1");
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+                   row(1, 2L, 3L));
+        execute("UPDATE %s SET val = val + 2 WHERE key = 1");
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+                   row(1, 4L, 5L));
+        execute("UPDATE %s SET val = val - 2 WHERE key = 1");
+        assertRows(execute("SELECT key, val, " + fName + "(val) FROM %s"),
+                   row(1, 2L, 3L));
+    }
+
+    @Test
     public void testFunctionInTargetKeyspace() throws Throwable
     {
         createTable("CREATE TABLE %s (key int primary key, val double)");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8287ebcb/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index e661b4f..0e0313c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
@@ -172,6 +173,47 @@ public class AggregationTest extends CQLTester
     }
 
     @Test
+    public void testAggregateOnCounters() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b counter, primary key (a))");
+
+        // Test with empty table
+        assertColumnNames(execute("SELECT count(b), max(b) as max, b FROM %s"),
+                          "system.count(b)", "max", "b");
+        assertRows(execute("SELECT count(b), max(b) as max, b FROM %s"),
+                   row(0L, null, null));
+
+        execute("UPDATE %s SET b = b + 1 WHERE a = 1");
+        execute("UPDATE %s SET b = b + 1 WHERE a = 1");
+
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(1L, 2L, 2L, 2L, 2L));
+        flush();
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(1L, 2L, 2L, 2L, 2L));
+
+        execute("UPDATE %s SET b = b + 2 WHERE a = 1");
+
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(1L, 4L, 4L, 4L, 4L));
+
+        execute("UPDATE %s SET b = b - 2 WHERE a = 1");
+
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(1L, 2L, 2L, 2L, 2L));
+        flush();
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(1L, 2L, 2L, 2L, 2L));
+
+        execute("UPDATE %s SET b = b + 1 WHERE a = 2");
+        execute("UPDATE %s SET b = b + 1 WHERE a = 2");
+        execute("UPDATE %s SET b = b + 2 WHERE a = 2");
+
+        assertRows(execute("SELECT count(b), max(b) as max, min(b) as min, avg(b) as avg,
sum(b) as sum FROM %s"),
+                   row(2L, 4L, 2L, 3L, 6L));
+    }
+
+    @Test
     public void testAggregateWithUdtFields() throws Throwable
     {
         String myType = createType("CREATE TYPE %s (x int)");


Mime
View raw message