drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From paul-rogers <...@git.apache.org>
Subject [GitHub] drill pull request #921: DRILL-4286 Graceful shutdown of drillbit
Date Tue, 14 Nov 2017 22:41:53 GMT
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/921#discussion_r150985401
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java
---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.test;
    +import com.google.common.io.Files;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +import org.apache.drill.exec.server.Drillbit;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +import java.io.PrintWriter;
    +import java.net.HttpURLConnection;
    +import java.net.URL;
    +import java.util.Collection;
    +import java.util.Properties;
    +import static org.junit.Assert.assertNotEquals;
    +import static org.junit.Assert.fail;
    +
    +public class TestGracefulShutdown {
    +
    +  static String testDirPath;
    +  @BeforeClass
    +  public static void setUpTestData() throws Exception {
    +
    +    final File testDir = getTempDir("graceful_shutdown");
    +    testDirPath = testDir.getAbsolutePath();
    +    for( int i = 0; i < 500; i++) {
    +      setupFile(testDir, i);
    +    }
    +  }
    +
    +
    +  public static final Properties WEBSERVER_CONFIGURATION = new Properties() {
    +    {
    +      put(ExecConstants.HTTP_ENABLE, true);
    +      put(ExecConstants.HTTP_PORT_HUNT, true);
    +    }
    +  };
    +
    +  public ClusterFixtureBuilder enableWebServer(ClusterFixtureBuilder builder) {
    +    Properties props = new Properties();
    +    props.putAll(WEBSERVER_CONFIGURATION);
    +    builder.configBuilder.configProps(props);
    +    return builder;
    +  }
    +
    +
    +  /*
    +  Start multiple drillbits and then shutdown a drillbit. Query the online
    +  endpoints and check if the drillbit still exists.
    +   */
    +  @Test
    +  public void testOnlineEndPoints() throws  Exception {
    +
    +    String[] drillbits = {"db1" ,"db2","db3", "db4", "db5", "db6"};
    +    ClusterFixtureBuilder builder = ClusterFixture.builder().withBits(drillbits).withLocalZk();
    +
    +
    +    try ( ClusterFixture cluster = builder.build();
    +          ClientFixture client = cluster.clientFixture()) {
    +
    +      Drillbit drillbit = cluster.drillbit("db2");
    +      DrillbitEndpoint drillbitEndpoint =  drillbit.getRegistrationHandle().getEndPoint();
    +      int grace_period = drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
    +      new Thread(new Runnable() {
    +        public void run() {
    +          try {
    +            cluster.closeDrillbit("db2");
    +          } catch (Exception e) {
    +            e.printStackTrace();
    +          }
    +        }
    +      }).start();
    +      //wait for graceperiod
    +      Thread.sleep(grace_period);
    +      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext()
    +              .getClusterCoordinator()
    +              .getOnlineEndPoints();
    +      Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint));
    +    }
    +  }
    +  /*
    +    Test if the drillbit transitions from ONLINE state when a shutdown
    +    request is initiated
    +   */
    +  @Test
    +  public void testStateChange() throws  Exception {
    +
    +    String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
    +    ClusterFixtureBuilder builder = ClusterFixture.builder().withBits(drillbits).withLocalZk();
    +
    +    try ( ClusterFixture cluster = builder.build();
    +          ClientFixture client = cluster.clientFixture()) {
    +      Drillbit drillbit = cluster.drillbit("db2");
    +      int grace_period = drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
    +      DrillbitEndpoint drillbitEndpoint =  drillbit.getRegistrationHandle().getEndPoint();
    +      new Thread(new Runnable() {
    +        public void run() {
    +          try {
    +            cluster.closeDrillbit("db2");
    +          } catch (Exception e) {
    +            e.printStackTrace();
    +          }
    +        }
    +      }).start();
    +      Thread.sleep(grace_period);
    +      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext()
    +              .getClusterCoordinator()
    +              .getAvailableEndpoints();
    +      for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) {
    +        if(drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress()) && drillbitEndpoint.getUserPort()
== dbEndpoint.getUserPort()) {
    +          assertNotEquals(dbEndpoint.getState(),DrillbitEndpoint.State.ONLINE);
    +        }
    +      }
    +    }
    +  }
    +
    +  /*
    +   Test shutdown through RestApi
    +   */
    +  @Test
    +  public void testRestApi() throws Exception {
    +
    +    String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
    +    ClusterFixtureBuilder builder = ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
    +    builder = enableWebServer(builder);
    +    QueryBuilder.QuerySummaryFuture listener;
    +    final String sql = "SELECT * FROM dfs.`"+testDirPath +"` ORDER BY employee_id";
    +    try ( ClusterFixture cluster = builder.build();
    +          final ClientFixture client = cluster.clientFixture()) {
    +      Drillbit drillbit = cluster.drillbit("db1");
    +      int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port");
    +      int grace_period = drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
    +      listener =  client.queryBuilder().sql(sql).futureSummary();
    +      Thread.sleep(10000);
    +      while( port < 8052) {
    +        URL url = new URL("http://localhost:"+port+"/graceful_shutdown");
    +        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    +        conn.setRequestMethod("POST");
    +        if (conn.getResponseCode() != 200) {
    +          throw new RuntimeException("Failed : HTTP error code : "
    +                  + conn.getResponseCode());
    +        }
    +        port++;
    +      }
    +      Thread.sleep(grace_period);
    +      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext()
    +              .getClusterCoordinator()
    +              .getOnlineEndPoints();
    +      while(!listener.isDone()) {
    +        Thread.sleep(10);
    +      }
    +      Assert.assertTrue(listener.isDone());
    +      Assert.assertEquals(1,drillbitEndpoints.size());
    +    }
    +  }
    +
    +  /*
    +   Test default shutdown through RestApi
    +   */
    +  @Test
    +  public void testRestApiShutdown() throws Exception {
    +
    +    String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
    +    ClusterFixtureBuilder builder = ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
    +    builder = enableWebServer(builder);
    +    QueryBuilder.QuerySummaryFuture listener;
    +    final String sql = "SELECT * FROM dfs."+testDirPath +" ORDER BY employee_id`";
    +    try ( ClusterFixture cluster = builder.build();
    +          final ClientFixture client = cluster.clientFixture()) {
    +      Drillbit drillbit = cluster.drillbit("db1");
    +      int port = drillbit.getContext().getConfig().getInt("drill.exec.http.port");
    +      int grace_period = drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
    +      listener =  client.queryBuilder().sql(sql).futureSummary();
    +      Thread.sleep(50000);
    +      while( port < 8048) {
    +        URL url = new URL("http://localhost:"+port+"/shutdown");
    +        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    +        conn.setRequestMethod("POST");
    +        if (conn.getResponseCode() != 200) {
    +          throw new RuntimeException("Failed : HTTP error code : "
    +                  + conn.getResponseCode());
    +        }
    +        port++;
    +      }
    +      Thread.sleep(grace_period);
    +      Thread.sleep(5000);
    +      Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit().getContext()
    +              .getClusterCoordinator().getAvailableEndpoints();
    +      while(!listener.isDone()) {
    +        Thread.sleep(10);
    +      }
    +      Assert.assertTrue(listener.isDone());
    +      Assert.assertEquals(5,drillbitEndpoints.size());
    +    }
    +  }
    +
    +  public static File getTempDir(final String dirName) {
    +    final File dir = Files.createTempDir();
    +    Runtime.getRuntime().addShutdownHook(new Thread() {
    +      @Override
    +      public void run() {
    +        FileUtils.deleteQuietly(dir);
    +      }
    +    });
    +    File tempDir = new File(dir, dirName);
    +    tempDir.mkdirs();
    +    return tempDir;
    +  }
    +
    +  private static File getTempFile(int file_num, File testDir) throws Exception {
    +    return File.createTempFile("employee"+file_num, ".json", testDir);
    +  }
    +
    +  private static void setupFile(File testDir, int file_num) throws Exception {
    +    File destFile = getTempFile(file_num, testDir);
    +    try (PrintWriter out = new PrintWriter(new FileWriter(destFile))) {
    --- End diff --
    
    Can this test use one of our many existing data files? Maybe the `customer` table on the
class path?


---

Mime
View raw message