calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [1/2] calcite git commit: [CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)
Date Tue, 08 Dec 2015 00:38:40 GMT
Repository: calcite
Updated Branches:
  refs/heads/master 9c86556ff -> 937fc461a


[CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)

Rule to transform Delta(Scan(constant-table)) to Empty;
fix NullPointerException in PruneEmptyRules.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e9d50602
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e9d50602
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e9d50602

Branch: refs/heads/master
Commit: e9d506021252e1da6c09cebad3f747cd0e627d90
Parents: 9c86556
Author: Milinda Pathirage <milinda.pathirage@gmail.com>
Authored: Fri Nov 20 19:03:21 2015 -0500
Committer: Julian Hyde <jhyde@apache.org>
Committed: Mon Dec 7 14:22:02 2015 -0800

----------------------------------------------------------------------
 .../calcite/rel/rules/PruneEmptyRules.java      |   3 +-
 .../apache/calcite/rel/stream/StreamRules.java  |  75 ++++++++++++-
 .../apache/calcite/runtime/CalciteResource.java |   3 +
 .../calcite/sql/validate/SqlValidatorImpl.java  |  45 ++++++--
 .../calcite/runtime/CalciteResource.properties  |   1 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java |   4 +-
 .../apache/calcite/test/MockCatalogReader.java  |  16 +++
 .../apache/calcite/test/SqlValidatorTest.java   |  15 +++
 .../org/apache/calcite/test/StreamTest.java     | 108 +++++++++++++++++++
 9 files changed, 260 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
index 2f4cada..3fccdfa 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
@@ -72,7 +72,8 @@ public abstract class PruneEmptyRules {
         public void onMatch(RelOptRuleCall call) {
           LogicalUnion union = call.rel(0);
           final List<RelNode> childRels = call.getChildRels(union);
-          final List<RelNode> newChildRels = new ArrayList<RelNode>();
+          assert childRels != null;
+          final List<RelNode> newChildRels = new ArrayList<>();
           for (RelNode childRel : childRels) {
             if (!isEmpty(childRel)) {
               newChildRels.add(childRel);

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 28b9972..4e64dc5 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -24,16 +24,19 @@ import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.Util;
@@ -56,7 +59,9 @@ public class StreamRules {
           new DeltaAggregateTransposeRule(),
           new DeltaSortTransposeRule(),
           new DeltaUnionTransposeRule(),
-          new DeltaTableScanRule());
+          new DeltaJoinTransposeRule(),
+          new DeltaTableScanRule(),
+          new DeltaTableScanToEmptyRule());
 
   /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
   public static class DeltaProjectTransposeRule extends RelOptRule {
@@ -193,6 +198,74 @@ public class StreamRules {
       }
     }
   }
+
+  /**
+   * Planner rule that converts {@link Delta} over a {@link TableScan} of
+   * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty.
+   */
+  public static class DeltaTableScanToEmptyRule extends RelOptRule {
+    private DeltaTableScanToEmptyRule() {
+      super(
+          operand(Delta.class,
+              operand(TableScan.class, none())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final TableScan scan = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      final RelOptTable relOptTable = scan.getTable();
+      final StreamableTable streamableTable =
+          relOptTable.unwrap(StreamableTable.class);
+      if (streamableTable == null) {
+        call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType()));
+      }
+    }
+  }
+
+
+  /**
+   * Planner rule that pushes a {@link Delta} through a {@link Join}.
+   *
+   * Product rule [1] is applied to implement the transpose:
+   * stream(x join y)" = "x join stream(y) union all stream(x) join y
+   *
+   * [1] https://en.wikipedia.org/wiki/Product_rule
+   */
+  public static class DeltaJoinTransposeRule extends RelOptRule {
+
+    public DeltaJoinTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Join.class, any())));
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final Join join = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      RelNode left = join.getLeft();
+      RelNode right = join.getRight();
+
+      final LogicalDelta rightWithDelta = LogicalDelta.create(right);
+      final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      final LogicalDelta leftWithDelta = LogicalDelta.create(left);
+      final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      List<RelNode> inputsToUnion = Lists.newArrayList();
+      inputsToUnion.add(joinL);
+      inputsToUnion.add(joinR);
+
+      final LogicalUnion newNode = LogicalUnion.create(inputsToUnion, true);
+      call.transformTo(newNode);
+    }
+  }
 }
 
 // End StreamRules.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index aa701d9..c06f3c3 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -603,6 +603,9 @@ public interface CalciteResource {
 
   @BaseMessage("Table ''{0}'' not found")
   ExInst<CalciteException> tableNotFound(String tableName);
+
+  @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least
one input should be convertable to a stream.")
+  ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs);
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 0390c07..d430575 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -3016,17 +3017,47 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
   public boolean validateModality(SqlSelect select, SqlModality modality,
       boolean fail) {
     final SelectScope scope = getRawSelectScope(select);
-    for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
-      if (!namespace.right.supportsModality(modality)) {
-        switch (modality) {
-        case STREAM:
+
+    switch (modality) {
+    case STREAM:
+      if (scope.children.size() == 1) {
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (!namespace.right.supportsModality(modality)) {
+            if (fail) {
+              throw newValidationError(namespace.right.getNode(),
+                  Static.RESOURCE.cannotConvertToStream(namespace.left));
+            } else {
+              return false;
+            }
+          }
+        }
+      } else {
+        boolean atLeastOneSupportsModality = false;
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (namespace.right.supportsModality(modality)) {
+            atLeastOneSupportsModality = true;
+          }
+        }
+
+        if (!atLeastOneSupportsModality) {
           if (fail) {
-            throw newValidationError(namespace.right.getNode(),
-                Static.RESOURCE.cannotConvertToStream(namespace.left));
+            List<String> inputList = new ArrayList<String>();
+            for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+              inputList.add(namespace.left);
+            }
+            String inputs = Joiner.on(", ").join(inputList);
+
+            throw newValidationError(select,
+                Static.RESOURCE.cannotStreamResultsForNonStreamingInputs(inputs));
           } else {
             return false;
           }
-        default:
+        }
+      }
+      break;
+    default:
+      for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+        if (!namespace.right.supportsModality(modality)) {
           if (fail) {
             throw newValidationError(namespace.right.getNode(),
                 Static.RESOURCE.cannotConvertToRelation(namespace.left));

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 9787ba9..2eb4947 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,4 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on
a single
 MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}''
of base table ''{1}''
 NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column
''{0}'' of base table ''{1}''
 TableNotFound=Table ''{0}'' not found
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming
inputs: ''{0}''. At least one input should be convertable to a stream.
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index 1a71ec4..50019e1 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -72,7 +72,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
           "TABLE(CATALOG.SALES.BONUS)",
           "TABLE(CATALOG.SALES.ORDERS)",
           "TABLE(CATALOG.SALES.SALGRADE)",
-          "TABLE(CATALOG.SALES.SHIPMENTS)");
+          "TABLE(CATALOG.SALES.SHIPMENTS)",
+          "TABLE(CATALOG.SALES.PRODUCTS)",
+          "TABLE(CATALOG.SALES.SUPPLIERS)");
 
   private static final List<String> SCHEMAS =
       Arrays.asList(

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index 71b3115..b76b908 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -250,6 +250,22 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     shipmentsStream.addColumn("ORDERID", intType);
     registerTable(shipmentsStream);
 
+    // Register "PRODUCTS" table.
+    MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
+        false);
+    productsTable.addColumn("PRODUCTID", intType);
+    productsTable.addColumn("NAME", varchar20Type);
+    productsTable.addColumn("SUPPLIERID", intType);
+    registerTable(productsTable);
+
+    // Register "SUPPLIERS" table.
+    MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
+        false);
+    suppliersTable.addColumn("SUPPLIERID", intType);
+    suppliersTable.addColumn("NAME", varchar20Type);
+    suppliersTable.addColumn("CITY", intType);
+    registerTable(suppliersTable);
+
     // Register "EMP_20" view.
     // Same columns as "EMP",
     // but "DEPTNO" not visible and set to 20 by default

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 026f41e..7ace229 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -132,6 +132,12 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     return "Cannot convert stream '" + table + "' to relation";
   }
 
+  private static String cannotStreamResultsForNonStreamingInputs(String inputs) {
+    return "Cannot stream results of a query with no streaming inputs: '"
+        + inputs
+        + "'. At least one input should be convertable to a stream.";
+  }
+
   @Test public void testMultipleSameAsPass() {
     check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))");
   }
@@ -7415,6 +7421,15 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         + "order by floor(rowtime to hour), rowtime desc").ok();
   }
 
+  @Test public void testStreamJoin() {
+    sql("select stream \n"
+        + "orders.rowtime as rowtime, orders.orderId as orderId, products.supplierId as supplierId
\n"
+        + "from orders join products on orders.productId = products.productId").ok();
+    sql("^select stream *\n"
+        + "from products join suppliers on products.supplierId = suppliers.supplierId^")
+        .fails(cannotStreamResultsForNonStreamingInputs("PRODUCTS, SUPPLIERS"));
+  }
+
   @Test public void testNew() {
     // (To debug individual statements, paste them into this method.)
     //            1         2         3         4         5         6

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 269a2e5..90d67dd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertThat;
 public class StreamTest {
   public static final String STREAM_SCHEMA_NAME = "STREAMS";
   public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+  public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
 
   private static String schemaFor(String name, Class<? extends TableFactory> clazz)
{
     return "     {\n"
@@ -74,6 +75,27 @@ public class StreamTest {
       + "     }";
   }
 
+  private static final String STREAM_JOINS_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'STREAMJOINS',\n"
+      + "   schemas: [\n"
+      + "     {\n"
+      + "       name: 'STREAMJOINS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+      + "       }, \n"
+      + "       {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'PRODUCTS',\n"
+      + "         factory: '" + ProductsTableFactory.class.getName() + "'\n"
+      + "       }]\n"
+      + "     }]}";
+
   public static final String STREAM_MODEL = "{\n"
       + "  version: '1.0',\n"
       + "  defaultSchema: 'foodmart',\n"
@@ -212,6 +234,32 @@ public class StreamTest {
         .returnsCount(100);
   }
 
+  @Test public void testStreamToRelaitonJoin() {
+    CalciteAssert.model(STREAM_JOINS_MODEL)
+        .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+        .query("select stream "
+            + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId
"
+            + "from orders join products on orders.product = products.id")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+                + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5],
SUPPLIER=[$6])\n"
+                + "      LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+                + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3],
PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\"
NOT NULL])\n"
+                + "          LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
+                + "        LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n")
+        .explainContains(""
+            + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER
SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
+            + "      EnumerableInterpreter\n"
+            + "        BindableTableScan(table=[[]])\n"
+            + "    EnumerableInterpreter\n"
+            + "      BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
+        .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+            "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+            "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+  }
+
   private Function<ResultSet, Void> startsWith(String... rows) {
     final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
     return new Function<ResultSet, Void>() {
@@ -362,6 +410,66 @@ public class StreamTest {
       return this;
     }
   }
+
+  /**
+   * Mocks simple relation to use for stream joining test.
+   */
+  public static class ProductsTableFactory implements TableFactory<Table> {
+
+    public ProductsTableFactory(){}
+
+    @Override
+    public Table create(SchemaPlus schema, String name, Map<String, Object> operand,
+                        RelDataType rowType) {
+      final ImmutableList<Object[]> rows = ImmutableList.of(
+        new Object[]{"paint", 1},
+        new Object[]{"paper", 0},
+        new Object[]{"brush", 1}
+      );
+      return new ProductsTable(rows);
+    }
+  }
+
+  /**
+   * Table representing the PRODUCTS relation
+   */
+  public static class ProductsTable implements ScannableTable {
+
+    private final ImmutableList<Object[]> rows;
+
+    public ProductsTable(ImmutableList<Object[]> rows) {
+      this.rows = rows;
+    }
+
+    private final RelProtoDataType protoRowType = new RelProtoDataType() {
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("ID", SqlTypeName.VARCHAR, 32)
+            .add("SUPPLIER", SqlTypeName.INTEGER)
+            .build();
+      }
+    };
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
+    }
+
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
 }
 
 // End StreamTest.java


Mime
View raw message