Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90E3C2004A0 for ; Wed, 16 Aug 2017 18:55:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8EE20160C88; Wed, 16 Aug 2017 16:55:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8AA3168E7B for ; Wed, 16 Aug 2017 18:55:42 +0200 (CEST) Received: (qmail 30725 invoked by uid 500); 16 Aug 2017 16:55:41 -0000 Mailing-List: contact commits-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list commits@gobblin.incubator.apache.org Received: (qmail 30712 invoked by uid 99); 16 Aug 2017 16:55:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Aug 2017 16:55:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2CEBE9456; Wed, 16 Aug 2017 16:55:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hutran@apache.org To: commits@gobblin.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?incubator-gobblin_git_commit=3A_=5BGOBBLIN-204=5D_Add_a_?= =?utf-8?q?service_that_fetches_GaaS_flow_configs_from_a_git=E2=80=A6?= Date: Wed, 16 Aug 2017 16:55:40 +0000 (UTC) archived-at: Wed, 16 Aug 2017 16:55:44 -0000 Repository: incubator-gobblin Updated Branches: refs/heads/master 56cecb28a -> da382dbf1 [GOBBLIN-204] Add a service that fetches GaaS flow configs from a git… Closes #2055 from htran1/git_config Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/da382dbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/da382dbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/da382dbf Branch: refs/heads/master Commit: da382dbf10bf66ab458463d593facab775fa121e Parents: 56cecb2 Author: Hung Tran Authored: Wed Aug 16 09:54:55 2017 -0700 Committer: Hung Tran Committed: Wed Aug 16 09:54:55 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 14 + .../gobblin/runtime/spec_store/FSSpecStore.java | 4 +- gobblin-service/build.gradle | 1 + .../gobblin/service/ServiceConfigKeys.java | 1 + .../service/modules/core/GitConfigMonitor.java | 434 +++++++++++++++++++ .../modules/core/GobblinServiceManager.java | 29 ++ .../modules/core/GitConfigMonitorTest.java | 334 ++++++++++++++ .../modules/core/GobblinServiceManagerTest.java | 76 +++- gradle/scripts/dependencyDefinitions.gradle | 1 + 9 files changed, 876 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index acf3b52..3336426 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -831,4 +831,18 @@ public class ConfigurationKeys { * Configuration related to ConfigStore based copy/retention */ public static final String CONFIG_BASED_PREFIX = "gobblin.configBased"; + + /** + * Configuration related to the git flow config monitoring service + */ + public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor."; + public static final String GIT_CONFIG_MONITOR_REPO_URI = GIT_CONFIG_MONITOR_PREFIX + "repositoryUri"; + public static final String GIT_CONFIG_MONITOR_REPO_DIR = GIT_CONFIG_MONITOR_PREFIX + "repositoryDirectory"; + public static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config"; + public static final String GIT_CONFIG_MONITOR_CONFIG_DIR = GIT_CONFIG_MONITOR_PREFIX + "configDirectory"; + public static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config"; + public static final String GIT_CONFIG_MONITOR_POLLING_INTERVAL = GIT_CONFIG_MONITOR_PREFIX + "pollingInterval"; + public static final String GIT_CONFIG_MONITOR_BRANCH_NAME = GIT_CONFIG_MONITOR_PREFIX + "branchName"; + public static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master"; + public static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java index f852aea..608d390 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java @@ -17,11 +17,11 @@ package org.apache.gobblin.runtime.spec_store; +import com.google.common.io.ByteStreams; import java.io.IOException; import java.net.URI; import java.util.Collection; -import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -258,7 +258,7 @@ public class FSSpecStore implements SpecStore { Spec spec = null; try (FSDataInputStream fis = fs.open(path);) { - spec = this.specSerDe.deserialize(IOUtils.toByteArray(fis)); + spec = this.specSerDe.deserialize(ByteStreams.toByteArray(fis)); } return spec; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle index 33690eb..642d818 100644 --- a/gobblin-service/build.gradle +++ b/gobblin-service/build.gradle @@ -56,6 +56,7 @@ dependencies { compile externalDependency.jacksonCore compile externalDependency.jacksonMapper compile externalDependency.javaxInject + compile externalDependency.jgit compile externalDependency.jodaTime compile externalDependency.kafka08 compile externalDependency.log4j http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index 76ed90a..a6f0199 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -34,6 +34,7 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled"; public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; + public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; // Helix / ServiceScheduler Keys public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java new file mode 100644 index 0000000..82f3e0d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.core; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.ResetCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.errors.RepositoryNotFoundException; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectReader; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.treewalk.CanonicalTreeParser; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.io.Files; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.PullFileLoader; + +import lombok.extern.slf4j.Slf4j; + + +/** + * Service that monitors for jobs from a git repository. + * The git repository must have an inital commit that has no config files since that is used as a base for getting + * the change list. + * The config needs to be organized with the following structure: + * //.(pull|job|json|conf) + * The and is used to generate the URI used to store the config in the {@link FlowCatalog} + */ +@Slf4j +public class GitConfigMonitor extends AbstractIdleService { + private static final String SPEC_DESCRIPTION = "Git-based flow config"; + private static final String SPEC_VERSION = "1"; + private static final int TERMINATION_TIMEOUT = 30; + private static final int CONFIG_FILE_DEPTH = 3; + private static final String REMOTE_NAME = "origin"; + + private final ScheduledExecutorService scheduledExecutor; + private final GitRepository gitRepo; + private final int pollingInterval; + private final String repositoryDir; + private final String configDir; + private final Path configDirPath; + private final FlowCatalog flowCatalog; + private final PullFileLoader pullFileLoader; + private final Config emptyConfig = ConfigFactory.empty(); + private volatile boolean isActive = false; + + /** + * Create a {@link GitConfigMonitor} that monitors a git repository for changes and manages config in a + * {@link FlowCatalog} + * @param config configuration + * @param flowCatalog the flow catalog + */ + GitConfigMonitor(Config config, FlowCatalog flowCatalog) { + this.flowCatalog = flowCatalog; + + this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor"))); + + Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI), + ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI + " needs to be specified."); + + String repositoryUri = config.getString(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI); + + this.repositoryDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, + ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR); + + this.configDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_CONFIG_DIR, + ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR); + + this.pollingInterval = ConfigUtils.getInt(config, ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, + ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL); + + String branchName = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_BRANCH_NAME, + ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME); + + this.configDirPath = new Path(this.repositoryDir, this.configDir); + + try { + this.pullFileLoader = new PullFileLoader(this.configDirPath, + FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()), + PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS); + } catch (IOException e) { + throw new RuntimeException("Could not create pull file loader", e); + } + + try { + this.gitRepo = new GitRepository(repositoryUri, this.repositoryDir, branchName); + } catch (GitAPIException | IOException e) { + throw new RuntimeException("Could not open git repository", e); + } + } + + /** Start the service. */ + @Override + protected void startUp() throws Exception { + log.info("Starting the " + GitConfigMonitor.class.getSimpleName()); + log.info("Polling git with inteval {} ", this.pollingInterval); + + // Schedule the job config fetch task + this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + processGitConfigChanges(); + } catch (GitAPIException | IOException e) { + log.error("Failed to process git config changes", e); + // next run will try again since errors could be intermittent + } + } + }, 0, this.pollingInterval, TimeUnit.SECONDS); + } + + /** Stop the service. */ + @Override + protected void shutDown() throws Exception { + this.scheduledExecutor.shutdown(); + this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); + } + + public synchronized void setActive(boolean isActive) { + if (this.isActive == isActive) { + // No-op if already in correct state + return; + } + + this.isActive = isActive; + } + + /** + * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog} + * @throws GitAPIException + * @throws IOException + */ + @VisibleForTesting + void processGitConfigChanges() throws GitAPIException, IOException { + // if not active or if the flow catalog is not up yet then can't process config changes + if (!isActive || !this.flowCatalog.isRunning()) { + log.info("GitConfigMonitor: skip poll since the JobCatalog is not yet running."); + return; + } + + List changes = this.gitRepo.getChanges(); + + for (DiffEntry change : changes) { + switch (change.getChangeType()) { + case ADD: + case MODIFY: + addSpec(change); + break; + case DELETE: + removeSpec(change); + break; + case RENAME: + removeSpec(change); + addSpec(change); + break; + default: + throw new RuntimeException("Unsupported change type " + change.getChangeType()); + } + } + + // Done processing changes, so checkpoint + this.gitRepo.moveCheckpointAndHashesForward(); + } + + /** + * Add a {@link FlowSpec} for an added, updated, or modified flow config + * @param change + */ + private void addSpec(DiffEntry change) { + if (checkConfigFilePath(change.getNewPath())) { + Path configFilePath = new Path(this.repositoryDir, change.getNewPath()); + + try { + Config flowConfig = loadConfigFileWithFlowNameOverrides(configFilePath); + + this.flowCatalog.put(FlowSpec.builder() + .withConfig(flowConfig) + .withVersion(SPEC_VERSION) + .withDescription(SPEC_DESCRIPTION) + .build()); + } catch (IOException e) { + log.warn("Could not load config file: " + configFilePath); + } + } + } + + /** + * remove a {@link FlowSpec} for a deleted or renamed flow config + * @param change + */ + private void removeSpec(DiffEntry change) { + if (checkConfigFilePath(change.getOldPath())) { + Path configFilePath = new Path(this.repositoryDir, change.getOldPath()); + String flowName = Files.getNameWithoutExtension(configFilePath.getName()); + String flowGroup = configFilePath.getParent().getName(); + + // build a dummy config to get the proper URI for delete + Config dummyConfig = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup) + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, flowName) + .build(); + + FlowSpec spec = FlowSpec.builder() + .withConfig(dummyConfig) + .withVersion(SPEC_VERSION) + .withDescription(SPEC_DESCRIPTION) + .build(); + + this.flowCatalog.remove(spec.getUri()); + } + } + + + /** + * check whether the file has the proper naming and hierarchy + * @param configFilePath the relative path from the repo root + * @return false if the file does not conform + */ + private boolean checkConfigFilePath(String configFilePath) { + // The config needs to stored at configDir/flowGroup/flowName.(pull|job|json|conf) + Path configFile = new Path(configFilePath); + String fileExtension = Files.getFileExtension(configFile.getName()); + + if (configFile.depth() != CONFIG_FILE_DEPTH || + !configFile.getParent().getParent().getName().equals(configDir) || + !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) || + PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) { + log.warn("Changed file does not conform to directory structure and file name format, skipping: " + + configFilePath); + + return false; + } + + return true; + } + + /** + * Load the config file and override the flow name and flow path properties with the names from the file path + * @param configFilePath path of the config file relative to the repository root + * @return the configuration object + * @throws IOException + */ + private Config loadConfigFileWithFlowNameOverrides(Path configFilePath) throws IOException { + Config flowConfig = this.pullFileLoader.loadPullFile(configFilePath, emptyConfig, false); + String flowName = Files.getNameWithoutExtension(configFilePath.getName()); + String flowGroup = configFilePath.getParent().getName(); + + return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) + .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)); + } + + /** + * Class for managing a git repository + */ + private static class GitRepository { + private final static String CHECKPOINT_FILE = "checkpoint.txt"; + private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp"; + private final String repoUri; + private final String repoDir; + private final String branchName; + private Git git; + private String lastProcessedGitHash; + private String latestGitHash; + + /** + * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir + * @param repoUri URI of repository + * @param repoDir Directory to hold the local copy of the repository + * @param branchName Branch name + * @throws GitAPIException + * @throws IOException + */ + private GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException { + this.repoUri = repoUri; + this.repoDir = repoDir; + this.branchName = branchName; + + initRepository(); + } + + /** + * Open the repository if it exists locally, otherwise clone it + * @throws GitAPIException + * @throws IOException + */ + private void initRepository() throws GitAPIException, IOException { + File repoDirFile = new File(this.repoDir); + + try { + this.git = Git.open(repoDirFile); + + String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url"); + + if (!uri.equals(this.repoUri)) { + throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri); + } + } catch (RepositoryNotFoundException e) { + // if the repository was not found then clone a new one + this.git = Git.cloneRepository() + .setDirectory(repoDirFile) + .setURI(this.repoUri) + .setBranch(this.branchName) + .call(); + } + + try { + this.lastProcessedGitHash = readCheckpoint(); + } catch (FileNotFoundException e) { + // if no checkpoint is available then start with the first commit + Iterable logs = git.log().call(); + RevCommit lastLog = null; + + for (RevCommit log : logs) { + lastLog = log; + } + + if (lastLog != null) { + this.lastProcessedGitHash = lastLog.getName(); + } + } + + this.latestGitHash = this.lastProcessedGitHash; + } + + /** + * Read the last processed commit githash from the checkpoint file + * @return + * @throws IOException + */ + private String readCheckpoint() throws IOException { + File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); + return Files.toString(checkpointFile, Charsets.UTF_8); + } + + /** + * Write the last processed commit githash to the checkpoint file + * @param gitHash + * @throws IOException + */ + private void writeCheckpoint(String gitHash) throws IOException { + // write to a temporary name then rename to make the operation atomic when the file system allows a file to be + // replaced + File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP); + File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); + + Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8); + + Files.move(tmpCheckpointFile, checkpointFile); + } + + private void moveCheckpointAndHashesForward() throws IOException { + this.lastProcessedGitHash = this.latestGitHash; + + writeCheckpoint(this.latestGitHash); + } + + /** + * + * @throws GitAPIException + * @throws IOException + */ + private List getChanges() throws GitAPIException, IOException { + // get tree for last processed commit + ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}"); + + // refresh to latest and reset hard to handle forced pushes + this.git.fetch().setRemote(REMOTE_NAME).call(); + // reset hard to get a clean working set since pull --rebase may leave files around + this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call(); + + ObjectId head = this.git.getRepository().resolve("HEAD"); + ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}"); + + // remember the hash for the current HEAD. This will be checkpointed after the diff is processed. + latestGitHash = head.getName(); + + // diff old and new heads to find changes + ObjectReader reader = this.git.getRepository().newObjectReader(); + CanonicalTreeParser oldTreeIter = new CanonicalTreeParser(); + oldTreeIter.reset(reader, oldHeadTree); + CanonicalTreeParser newTreeIter = new CanonicalTreeParser(); + newTreeIter.reset(reader, headTree); + + return this.git.diff() + .setNewTree(newTreeIter) + .setOldTree(oldTreeIter) + .setShowNameAndStatusOnly(true) + .call(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 6aed51e..aebebdc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -113,6 +113,7 @@ public class GobblinServiceManager implements ApplicationLauncher { protected final boolean isSchedulerEnabled; protected final boolean isRestLIServerEnabled; protected final boolean isTopologySpecFactoryEnabled; + protected final boolean isGitConfigMonitorEnabled; protected TopologyCatalog topologyCatalog; @Getter @@ -127,6 +128,8 @@ public class GobblinServiceManager implements ApplicationLauncher { protected ClassAliasResolver aliasResolver; + protected GitConfigMonitor gitConfigMonitor; + @Getter protected Config config; @@ -161,6 +164,16 @@ public class GobblinServiceManager implements ApplicationLauncher { if (isFlowCatalogEnabled) { this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER)); this.serviceLauncher.addService(flowCatalog); + + this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config, + ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false); + + if (this.isGitConfigMonitorEnabled) { + this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog); + this.serviceLauncher.addService(this.gitConfigMonitor); + } + } else { + this.isGitConfigMonitorEnabled = false; } // Initialize Helix @@ -275,6 +288,10 @@ public class GobblinServiceManager implements ApplicationLauncher { LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler."); this.scheduler.setActive(true); } + + if (this.isGitConfigMonitorEnabled) { + this.gitConfigMonitor.setActive(true); + } } else if (this.helixManager.isPresent()) { LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(), this.helixManager.get().isLeader()); @@ -282,6 +299,10 @@ public class GobblinServiceManager implements ApplicationLauncher { LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler."); this.scheduler.setActive(false); } + + if (this.isGitConfigMonitorEnabled) { + this.gitConfigMonitor.setActive(false); + } } } @@ -311,6 +332,10 @@ public class GobblinServiceManager implements ApplicationLauncher { LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler."); this.scheduler.setActive(true); } + + if (this.isGitConfigMonitorEnabled) { + this.gitConfigMonitor.setActive(true); + } } else { if (this.isSchedulerEnabled) { LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler."); @@ -321,6 +346,10 @@ public class GobblinServiceManager implements ApplicationLauncher { // .. designate scheduler to itself LOGGER.info("[Init] Gobblin Service is running in single instance mode, enabling Scheduler."); this.scheduler.setActive(true); + + if (this.isGitConfigMonitorEnabled) { + this.gitConfigMonitor.setActive(true); + } } // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java new file mode 100644 index 0000000..6cd8bf2 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.core; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.ResetCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.RepositoryCache; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.util.FS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.typesafe.config.Config; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; + +public class GitConfigMonitorTest { + private static final Logger logger = LoggerFactory.getLogger(GitConfigMonitorTest.class); + private Repository remoteRepo; + private Git gitForPush; + private static final String TEST_DIR = "/tmp/gitConfigTestDir/"; + private final File remoteDir = new File(TEST_DIR + "/remote"); + private final File cloneDir = new File(TEST_DIR + "/clone"); + private final File configDir = new File(cloneDir, "/gobblin-config"); + private static final String TEST_FLOW_FILE = "testFlow.pull"; + private static final String TEST_FLOW_FILE2 = "testFlow2.pull"; + private static final String TEST_FLOW_FILE3 = "testFlow3.pull"; + private final File testGroupDir = new File(configDir, "testGroup"); + private final File testFlowFile = new File(testGroupDir, TEST_FLOW_FILE); + private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2); + private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3); + + private RefSpec masterRefSpec = new RefSpec("master"); + private FlowCatalog flowCatalog; + private Config config; + private GitConfigMonitor gitConfigMonitor; + + + @BeforeClass + public void setup() throws Exception { + cleanUpDir(TEST_DIR); + + // Create a bare repository + RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED); + this.remoteRepo = fileKey.open(false); + this.remoteRepo.create(true); + + this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); + + // push an empty commit as a base for detecting changes + this.gitForPush.commit().setMessage("First commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.config = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath()) + .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig") + .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog") + .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5) + .build(); + + this.flowCatalog = new FlowCatalog(config); + this.flowCatalog.startAsync().awaitRunning(); + this.gitConfigMonitor = new GitConfigMonitor(this.config, this.flowCatalog); + this.gitConfigMonitor.setActive(true); + } + + private void cleanUpDir(String dir) { + File specStoreDir = new File(dir); + + // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful + for (int i = 0; i < 5; i++) { + try { + if (specStoreDir.exists()) { + FileUtils.deleteDirectory(specStoreDir); + } + // if delete succeeded then break out of loop + break; + } catch (IOException e) { + logger.warn("Cleanup delete directory failed for directory: " + dir, e); + } + } + } + + @AfterClass + public void cleanUp() { + if (this.flowCatalog != null) { + this.flowCatalog.stopAsync().awaitTerminated(); + } + + cleanUpDir(TEST_DIR); + } + + private String formConfigFilePath(String groupDir, String fileName) { + return this.configDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } + + @Test + public void testAddConfig() throws IOException, GitAPIException, URISyntaxException { + // push a new config file + this.testGroupDir.mkdirs(); + this.testFlowFile.createNewFile(); + Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value1\n", testFlowFile, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName())) + .call(); + this.gitForPush.commit().setMessage("Second commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + + Collection specs = this.flowCatalog.getSpecs(); + + Assert.assertTrue(specs.size() == 1); + FlowSpec spec = (FlowSpec)(specs.iterator().next()); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value1"); + } + + @Test(dependsOnMethods = "testAddConfig") + public void testUpdateConfig() throws IOException, GitAPIException, URISyntaxException { + // push an updated config file + Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value2\n", testFlowFile, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName())) + .call(); + this.gitForPush.commit().setMessage("Third commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + + Collection specs = this.flowCatalog.getSpecs(); + + Assert.assertTrue(specs.size() == 1); + FlowSpec spec = (FlowSpec)(specs.iterator().next()); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value2"); + } + + @Test(dependsOnMethods = "testUpdateConfig") + public void testDeleteConfig() throws IOException, GitAPIException, URISyntaxException { + // delete a config file + testFlowFile.delete(); + + // flow catalog has 1 entry before the config is deleted + Collection specs = this.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 1); + + // add, commit, push + DirCache ac = this.gitForPush.rm().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName())) + .call(); + RevCommit cc = this.gitForPush.commit().setMessage("Fourth commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + + specs = this.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 0); + } + + @Test(dependsOnMethods = "testDeleteConfig") + public void testForcedPushConfig() throws IOException, GitAPIException, URISyntaxException { + // push a new config file + this.testGroupDir.mkdirs(); + this.testFlowFile.createNewFile(); + Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value1\n", testFlowFile, Charsets.UTF_8); + this.testFlowFile2.createNewFile(); + Files.write("flow.name=testFlow2\nflow.group=testGroup\nparam1=value2\n", testFlowFile2, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName())) + .call(); + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile2.getName())) + .call(); + this.gitForPush.commit().setMessage("Fifth commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + + Collection specs = this.flowCatalog.getSpecs(); + + Assert.assertTrue(specs.size() == 2); + List specList = Lists.newArrayList(specs); + specList.sort(new Comparator() { + @Override + public int compare(Spec o1, Spec o2) { + return o1.getUri().compareTo(o2.getUri()); + } + }); + + FlowSpec spec = (FlowSpec)specList.get(0); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value1"); + + spec = (FlowSpec)specList.get(1); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value2"); + + // go back in time to cause conflict + this.gitForPush.reset().setMode(ResetCommand.ResetType.HARD).setRef("HEAD~1").call(); + this.gitForPush.push().setForce(true).setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + // add new files + this.testGroupDir.mkdirs(); + this.testFlowFile2.createNewFile(); + Files.write("flow.name=testFlow2\nflow.group=testGroup\nparam1=value4\n", testFlowFile2, Charsets.UTF_8); + this.testFlowFile3.createNewFile(); + Files.write("flow.name=testFlow3\nflow.group=testGroup\nparam1=value5\n", testFlowFile3, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile2.getName())) + .call(); + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile3.getName())) + .call(); + this.gitForPush.commit().setMessage("Sixth commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + + specs = this.flowCatalog.getSpecs(); + + Assert.assertTrue(specs.size() == 2); + + specList = Lists.newArrayList(specs); + specList.sort(new Comparator() { + @Override + public int compare(Spec o1, Spec o2) { + return o1.getUri().compareTo(o2.getUri()); + } + }); + + spec = (FlowSpec)specList.get(0); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value4"); + + spec = (FlowSpec)specList.get(1); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow3")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow3"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value5"); + + // reset for next test case + this.gitForPush.reset().setMode(ResetCommand.ResetType.HARD).setRef("HEAD~4").call(); + this.gitForPush.push().setForce(true).setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitConfigMonitor.processGitConfigChanges(); + specs = this.flowCatalog.getSpecs(); + + Assert.assertTrue(specs.size() == 0); + } + + @Test(dependsOnMethods = "testForcedPushConfig") + public void testPollingConfig() throws IOException, GitAPIException, URISyntaxException, InterruptedException { + // push a new config file + this.testGroupDir.mkdirs(); + this.testFlowFile.createNewFile(); + Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8); + + // add, commit, push + this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName())) + .call(); + this.gitForPush.commit().setMessage("Seventh commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + Collection specs = this.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 0); + + this.gitConfigMonitor.startAsync().awaitRunning(); + + // polling is every 5 seconds, so wait twice as long and check + TimeUnit.SECONDS.sleep(10); + + specs = this.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 1); + + FlowSpec spec = (FlowSpec)(specs.iterator().next()); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value20"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java index 6378339..314dc66 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java @@ -17,15 +17,20 @@ package org.apache.gobblin.service.modules.core; -import org.apache.gobblin.service.FlowId; -import org.apache.gobblin.service.Schedule; import java.io.File; +import java.net.URI; +import java.util.Collection; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.lib.RepositoryCache; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.util.FS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -33,8 +38,10 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.google.common.io.Files; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.linkedin.data.template.StringMap; @@ -42,15 +49,12 @@ import com.linkedin.restli.client.RestLiResponseException; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.TopologySpec; -import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; +import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; @@ -65,6 +69,9 @@ public class GobblinServiceManagerTest { private static final String SPEC_VERSION = "1"; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore"; private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore"; + private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone"; + private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote"; + private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local"; private static final String TEST_GROUP_NAME = "testGroup1"; private static final String TEST_FLOW_NAME = "testFlow1"; @@ -76,18 +83,11 @@ public class GobblinServiceManagerTest { private static final String TEST_SOURCE_NAME = "testSource"; private static final String TEST_SINK_NAME = "testSink"; - private ServiceBasedAppLauncher serviceLauncher; - private TopologyCatalog topologyCatalog; - private TopologySpec topologySpec; - - private FlowCatalog flowCatalog; - private FlowSpec flowSpec; - - private Orchestrator orchestrator; - private GobblinServiceManager gobblinServiceManager; private FlowConfigClient flowConfigClient; + private Git gitForPush; + @BeforeClass public void setup() throws Exception { cleanUpDir(SERVICE_WORK_DIR); @@ -108,6 +108,21 @@ public class GobblinServiceManagerTest { serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); + serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true); + serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR); + serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR); + serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5); + + // Create a bare repository + RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED); + fileKey.open(false).create(true); + + this.gitForPush = Git.cloneRepository().setURI(GIT_REMOTE_REPO_DIR).setDirectory(new File(GIT_CLONE_DIR)).call(); + + // push an empty commit as a base for detecting changes + this.gitForPush.commit().setMessage("First commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call(); + this.gobblinServiceManager = new GobblinServiceManager("CoreService", "1", ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR))); this.gobblinServiceManager.start(); @@ -247,6 +262,35 @@ public class GobblinServiceManagerTest { Assert.fail("Get should have gotten a 404 error"); } + @Test (dependsOnMethods = "testDelete") + public void testGitCreate() throws Exception { + // push a new config file + File testFlowFile = new File(GIT_CLONE_DIR + "/gobblin-config/testGroup/testFlow.pull"); + testFlowFile.getParentFile().mkdirs(); + + Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8); + + Collection specs = this.gobblinServiceManager.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 0); + + // add, commit, push + this.gitForPush.add().addFilepattern("gobblin-config/testGroup/testFlow.pull").call(); + this.gitForPush.commit().setMessage("second commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call(); + + // polling is every 5 seconds, so wait twice as long and check + TimeUnit.SECONDS.sleep(10); + + specs = this.gobblinServiceManager.flowCatalog.getSpecs(); + Assert.assertTrue(specs.size() == 1); + + FlowSpec spec = (FlowSpec)(specs.iterator().next()); + Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); + Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); + Assert.assertEquals(spec.getConfig().getString("param1"), "value20"); + } + @Test public void testBadGet() throws Exception { FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gradle/scripts/dependencyDefinitions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index d100ef2..41e1485 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -71,6 +71,7 @@ ext.externalDependency = [ "httpclient": "org.apache.httpcomponents:httpclient:4.5.2", "httpcore": "org.apache.httpcomponents:httpcore:4.4.4", "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3", + "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r", "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version, "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test", "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,