drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements
Date Mon, 11 Dec 2017 20:48:02 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156193025
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities,
FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this context is for non-root
fragment
    -  private final QueryContext queryContext; // is null if this context is for non-root
fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>()
{
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for the data send
to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer,
sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException
{
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
final QueryContext queryContext,
    -      final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor,
statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(),
OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan options.", e);
    -      }
    -    }
    -    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all the memory limits
for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.",
e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This
is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection
connection,
    -      FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be
set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some cases, an external
event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long running operations
should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if everything is
okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the reader to get
full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be created in root
fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query user and suppress
authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions to list or
query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw authorization errors
to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final
int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns the
    +   * specified number of instances of the generated class. (Note that the name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId()
: "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final
int instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId
+ ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint),
sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
     
    -  public IncomingBuffers getBuffers() {
    -    return buffers;
    -  }
    +  Controller getController();
     
    -  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats
stats)
    -      throws OutOfMemoryException {
    -    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
    -    contexts.add(context);
    -    return context;
    -  }
    +  OperatorCreatorRegistry getOperatorCreatorRegistry();
    --- End diff --
    
    Needed only when creating operators, not when executing a fragment. So, network/cluster
related.
    
    Rather than continue to tag each item; please review the original interface class where
I did this analysis to decide which methods should be in the runtime interface and which are
network/cluster/server related (and so I left them in the implementation class.)
    
    Since we have no design doc, perhaps we can at least document these decisions and concepts
in the class header comments. (I suppose I should have done that; rereading the comment it
clearly lacks this level of detail.)


---

Mime
View raw message