asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/2] asterixdb git commit: Refactor Active Listeners
Date Tue, 30 Aug 2016 14:36:15 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master f3c7a8705 -> 43bae55f1


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/43bae55f/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f586af7..7bcb1d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -83,7 +83,7 @@ public class PigletMetadataProvider implements IMetadataProvider<String,
String>
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables,
boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec,
Object implConfig)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -133,14 +133,9 @@ public class PigletMetadataProvider implements IMetadataProvider<String,
String>
     }
 
     @Override
-    public boolean scannerOperatorIsLeaf(IDataSource<String> dataSource) {
-        return true;
-    }
-
-    @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink
sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         PigletFileDataSink ds = (PigletFileDataSink) sink;
         FileSplit[] fileSplits = ds.getFileSplits();
         String[] locations = new String[fileSplits.length];
@@ -192,7 +187,7 @@ public class PigletMetadataProvider implements IMetadataProvider<String,
String>
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable>
primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/43bae55f/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8701851..ddfb331 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -37,7 +37,6 @@ import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -269,8 +268,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
                     DataSourceScanOperator scan = (DataSourceScanOperator) op;
                     IDataSource dataSource = scan.getDataSource();
                     DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
-                    IMetadataProvider mp = context.getMetadataProvider();
-                    if (mp.scannerOperatorIsLeaf(dataSource)) {
+                    if (dataSource.isScanAccessPathALeaf()) {
                         dss.disableJobGenBelowMe();
                     }
                     op.setPhysicalOperator(dss);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/43bae55f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 4cddef1..1272562 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,10 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.xml.sax.InputSource;
+
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -86,10 +83,12 @@ import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
 import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.TriggerNCWork;
 import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
 import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
@@ -105,6 +104,7 @@ import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.ini4j.Ini;
 import org.xml.sax.InputSource;
 
 public class ClusterControllerService implements IControllerService {
@@ -351,6 +351,7 @@ public class ClusterControllerService implements IControllerService {
     public Map<String, NodeControllerState> getNodeMap() {
         return nodeRegistry;
     }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -406,21 +407,24 @@ public class ClusterControllerService implements IControllerService
{
                 }
 
                 case GET_JOB_STATUS: {
-                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction)
fn;
+                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                            (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
                     workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this,
gjsf.getJobId(),
                             new IPCResponder<JobStatus>(handle, mid)));
                     return;
                 }
 
                 case GET_JOB_INFO: {
-                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction)
fn;
+                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                            (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
                     workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this,
gjsf.getJobId(),
                             new IPCResponder<JobInfo>(handle, mid)));
                     return;
                 }
 
                 case START_JOB: {
-                    HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction)
fn;
+                    HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                            (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
                     workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
                             sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle,
mid)));
@@ -434,14 +438,16 @@ public class ClusterControllerService implements IControllerService
{
                 }
 
                 case GET_DATASET_RESULT_STATUS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf
= (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf
=
+                            (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction)
fn;
                     workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this,
gdrlf.getJobId(),
                             gdrlf.getResultSetId(), new IPCResponder<Status>(handle,
mid)));
                     return;
                 }
 
                 case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf
= (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf
=
+                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction)
fn;
                     workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
                             gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
                             new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
@@ -449,7 +455,8 @@ public class ClusterControllerService implements IControllerService {
                 }
 
                 case WAIT_FOR_COMPLETION: {
-                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction)
fn;
+                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                            (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
                     workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this,
wfcf.getJobId(),
                             new IPCResponder<Object>(handle, mid)));
                     return;
@@ -471,14 +478,16 @@ public class ClusterControllerService implements IControllerService
{
                 }
 
                 case CLI_DEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction)
fn;
+                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                            (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
                     workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this,
dbf.getBinaryURLs(),
                             dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle,
mid)));
                     return;
                 }
 
                 case CLI_UNDEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction)
fn;
+                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
+                            (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
                     workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this,
udbf.getDeploymentId(),
                             new IPCResponder<DeploymentId>(handle, mid)));
                     return;
@@ -556,21 +565,24 @@ public class ClusterControllerService implements IControllerService
{
                 }
 
                 case REGISTER_PARTITION_PROVIDER: {
-                    CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction)
fn;
+                    CCNCFunctions.RegisterPartitionProviderFunction rppf =
+                            (CCNCFunctions.RegisterPartitionProviderFunction) fn;
                     workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
                             rppf.getPartitionDescriptor()));
                     return;
                 }
 
                 case REGISTER_PARTITION_REQUEST: {
-                    CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction)
fn;
+                    CCNCFunctions.RegisterPartitionRequestFunction rprf =
+                            (CCNCFunctions.RegisterPartitionRequestFunction) fn;
                     workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
                             rprf.getPartitionRequest()));
                     return;
                 }
 
                 case REGISTER_RESULT_PARTITION_LOCATION: {
-                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction)
fn;
+                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+                            (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(),
rrplf.getEmptyResult(),
                             rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
@@ -578,21 +590,24 @@ public class ClusterControllerService implements IControllerService
{
                 }
 
                 case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
-                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction)
fn;
+                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf =
+                            (CCNCFunctions.ReportResultPartitionWriteCompletionFunction)
fn;
                     workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                     return;
                 }
 
                 case REPORT_RESULT_PARTITION_FAILURE: {
-                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction)
fn;
+                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
+                            (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
                     workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                     return;
                 }
 
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction)
fn;
+                    CCNCFunctions.SendApplicationMessageFunction rsf =
+                            (CCNCFunctions.SendApplicationMessageFunction) fn;
                     workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this,
rsf.getMessage(),
                             rsf.getDeploymentId(), rsf.getNodeId()));
                     return;
@@ -609,7 +624,6 @@ public class ClusterControllerService implements IControllerService {
 
                                 @Override
                                 public void setException(Exception e) {
-
                                 }
                             }));
                     return;


Mime
View raw message