gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-204] Add a service that fetches GaaS flow configs from a git…
Date Wed, 16 Aug 2017 16:55:40 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 56cecb28a -> da382dbf1


[GOBBLIN-204] Add a service that fetches GaaS flow configs from a git…

Closes #2055 from htran1/git_config


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/da382dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/da382dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/da382dbf

Branch: refs/heads/master
Commit: da382dbf10bf66ab458463d593facab775fa121e
Parents: 56cecb2
Author: Hung Tran <hutran@linkedin.com>
Authored: Wed Aug 16 09:54:55 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Wed Aug 16 09:54:55 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  14 +
 .../gobblin/runtime/spec_store/FSSpecStore.java |   4 +-
 gobblin-service/build.gradle                    |   1 +
 .../gobblin/service/ServiceConfigKeys.java      |   1 +
 .../service/modules/core/GitConfigMonitor.java  | 434 +++++++++++++++++++
 .../modules/core/GobblinServiceManager.java     |  29 ++
 .../modules/core/GitConfigMonitorTest.java      | 334 ++++++++++++++
 .../modules/core/GobblinServiceManagerTest.java |  76 +++-
 gradle/scripts/dependencyDefinitions.gradle     |   1 +
 9 files changed, 876 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index acf3b52..3336426 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -831,4 +831,18 @@ public class ConfigurationKeys {
    * Configuration related to ConfigStore based copy/retention
    */
   public static final String CONFIG_BASED_PREFIX = "gobblin.configBased";
+
+  /**
+   * Configuration related to the git flow config monitoring service
+   */
+  public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor.";
+  public static final String GIT_CONFIG_MONITOR_REPO_URI = GIT_CONFIG_MONITOR_PREFIX + "repositoryUri";
+  public static final String GIT_CONFIG_MONITOR_REPO_DIR = GIT_CONFIG_MONITOR_PREFIX + "repositoryDirectory";
+  public static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config";
+  public static final String GIT_CONFIG_MONITOR_CONFIG_DIR = GIT_CONFIG_MONITOR_PREFIX + "configDirectory";
+  public static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config";
+  public static final String GIT_CONFIG_MONITOR_POLLING_INTERVAL = GIT_CONFIG_MONITOR_PREFIX + "pollingInterval";
+  public static final String GIT_CONFIG_MONITOR_BRANCH_NAME = GIT_CONFIG_MONITOR_PREFIX + "branchName";
+  public static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master";
+  public static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index f852aea..608d390 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -17,11 +17,11 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
+import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 
-import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -258,7 +258,7 @@ public class FSSpecStore implements SpecStore {
     Spec spec = null;
 
     try (FSDataInputStream fis = fs.open(path);) {
-      spec = this.specSerDe.deserialize(IOUtils.toByteArray(fis));
+      spec = this.specSerDe.deserialize(ByteStreams.toByteArray(fis));
     }
 
     return spec;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 33690eb..642d818 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -56,6 +56,7 @@ dependencies {
   compile externalDependency.jacksonCore
   compile externalDependency.jacksonMapper
   compile externalDependency.javaxInject
+  compile externalDependency.jgit
   compile externalDependency.jodaTime
   compile externalDependency.kafka08
   compile externalDependency.log4j

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 76ed90a..a6f0199 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -34,6 +34,7 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
+  public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
 
   // Helix / ServiceScheduler Keys
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
new file mode 100644
index 0000000..82f3e0d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.ResetCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.diff.DiffEntry;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectReader;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.treewalk.CanonicalTreeParser;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PullFileLoader;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Service that monitors for jobs from a git repository.
+ * The git repository must have an inital commit that has no config files since that is used as a base for getting
+ * the change list.
+ * The config needs to be organized with the following structure:
+ * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
+ * The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog}
+ */
+@Slf4j
+public class GitConfigMonitor extends AbstractIdleService {
+  private static final String SPEC_DESCRIPTION = "Git-based flow config";
+  private static final String SPEC_VERSION = "1";
+  private static final int TERMINATION_TIMEOUT = 30;
+  private static final int CONFIG_FILE_DEPTH = 3;
+  private static final String REMOTE_NAME = "origin";
+
+  private final ScheduledExecutorService scheduledExecutor;
+  private final GitRepository gitRepo;
+  private final int pollingInterval;
+  private final String repositoryDir;
+  private final String configDir;
+  private final Path configDirPath;
+  private final FlowCatalog flowCatalog;
+  private final PullFileLoader pullFileLoader;
+  private final Config emptyConfig = ConfigFactory.empty();
+  private volatile boolean isActive = false;
+
+  /**
+   * Create a {@link GitConfigMonitor} that monitors a git repository for changes and manages config in a
+   * {@link FlowCatalog}
+   * @param config configuration
+   * @param flowCatalog the flow catalog
+   */
+  GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
+    this.flowCatalog = flowCatalog;
+
+    this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
+
+    Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI),
+        ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI + " needs to be specified.");
+
+    String repositoryUri = config.getString(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI);
+
+    this.repositoryDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR,
+        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR);
+
+    this.configDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_CONFIG_DIR,
+        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR);
+
+    this.pollingInterval = ConfigUtils.getInt(config, ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL,
+        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL);
+
+    String branchName = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_BRANCH_NAME,
+        ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME);
+
+    this.configDirPath = new Path(this.repositoryDir, this.configDir);
+
+    try {
+      this.pullFileLoader = new PullFileLoader(this.configDirPath,
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
+          PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create pull file loader", e);
+    }
+
+    try {
+      this.gitRepo = new GitRepository(repositoryUri, this.repositoryDir, branchName);
+    } catch (GitAPIException | IOException e) {
+      throw new RuntimeException("Could not open git repository", e);
+    }
+  }
+
+  /** Start the service. */
+  @Override
+  protected void startUp() throws Exception {
+    log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
+    log.info("Polling git with inteval {} ", this.pollingInterval);
+
+    // Schedule the job config fetch task
+    this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          processGitConfigChanges();
+        } catch (GitAPIException | IOException e) {
+          log.error("Failed to process git config changes", e);
+          // next run will try again since errors could be intermittent
+        }
+      }
+    }, 0, this.pollingInterval, TimeUnit.SECONDS);
+  }
+
+  /** Stop the service. */
+  @Override
+  protected void shutDown() throws Exception {
+    this.scheduledExecutor.shutdown();
+    this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+  }
+
+  public synchronized void setActive(boolean isActive) {
+    if (this.isActive == isActive) {
+      // No-op if already in correct state
+      return;
+    }
+
+    this.isActive = isActive;
+  }
+
+  /**
+   * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog}
+   * @throws GitAPIException
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void processGitConfigChanges() throws GitAPIException, IOException {
+    // if not active or if the flow catalog is not up yet then can't process config changes
+    if (!isActive || !this.flowCatalog.isRunning()) {
+      log.info("GitConfigMonitor: skip poll since the JobCatalog is not yet running.");
+      return;
+    }
+
+    List<DiffEntry> changes = this.gitRepo.getChanges();
+
+    for (DiffEntry change : changes) {
+      switch (change.getChangeType()) {
+        case ADD:
+        case MODIFY:
+          addSpec(change);
+          break;
+        case DELETE:
+          removeSpec(change);
+          break;
+        case RENAME:
+          removeSpec(change);
+          addSpec(change);
+          break;
+        default:
+          throw new RuntimeException("Unsupported change type " + change.getChangeType());
+      }
+    }
+
+    // Done processing changes, so checkpoint
+    this.gitRepo.moveCheckpointAndHashesForward();
+  }
+
+  /**
+   * Add a {@link FlowSpec} for an added, updated, or modified flow config
+   * @param change
+   */
+  private void addSpec(DiffEntry change) {
+    if (checkConfigFilePath(change.getNewPath())) {
+      Path configFilePath = new Path(this.repositoryDir, change.getNewPath());
+
+      try {
+        Config flowConfig = loadConfigFileWithFlowNameOverrides(configFilePath);
+
+        this.flowCatalog.put(FlowSpec.builder()
+            .withConfig(flowConfig)
+            .withVersion(SPEC_VERSION)
+            .withDescription(SPEC_DESCRIPTION)
+            .build());
+      } catch (IOException e) {
+        log.warn("Could not load config file: " + configFilePath);
+      }
+    }
+  }
+
+  /**
+   * remove a {@link FlowSpec} for a deleted or renamed flow config
+   * @param change
+   */
+  private void removeSpec(DiffEntry change) {
+    if (checkConfigFilePath(change.getOldPath())) {
+      Path configFilePath = new Path(this.repositoryDir, change.getOldPath());
+      String flowName = Files.getNameWithoutExtension(configFilePath.getName());
+      String flowGroup = configFilePath.getParent().getName();
+
+      // build a dummy config to get the proper URI for delete
+      Config dummyConfig = ConfigBuilder.create()
+          .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup)
+          .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, flowName)
+          .build();
+
+      FlowSpec spec = FlowSpec.builder()
+          .withConfig(dummyConfig)
+          .withVersion(SPEC_VERSION)
+          .withDescription(SPEC_DESCRIPTION)
+          .build();
+
+      this.flowCatalog.remove(spec.getUri());
+    }
+  }
+
+
+    /**
+     * check whether the file has the proper naming and hierarchy
+     * @param configFilePath the relative path from the repo root
+     * @return false if the file does not conform
+     */
+  private boolean checkConfigFilePath(String configFilePath) {
+    // The config needs to stored at configDir/flowGroup/flowName.(pull|job|json|conf)
+    Path configFile = new Path(configFilePath);
+    String fileExtension = Files.getFileExtension(configFile.getName());
+
+    if (configFile.depth() != CONFIG_FILE_DEPTH ||
+        !configFile.getParent().getParent().getName().equals(configDir) ||
+        !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) ||
+            PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) {
+      log.warn("Changed file does not conform to directory structure and file name format, skipping: "
+          + configFilePath);
+
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Load the config file and override the flow name and flow path properties with the names from the file path
+   * @param configFilePath path of the config file relative to the repository root
+   * @return the configuration object
+   * @throws IOException
+   */
+  private Config loadConfigFileWithFlowNameOverrides(Path configFilePath) throws IOException {
+    Config flowConfig = this.pullFileLoader.loadPullFile(configFilePath, emptyConfig, false);
+    String flowName = Files.getNameWithoutExtension(configFilePath.getName());
+    String flowGroup = configFilePath.getParent().getName();
+
+    return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))
+        .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup));
+  }
+
+  /**
+   * Class for managing a git repository
+   */
+  private static class GitRepository {
+    private final static String CHECKPOINT_FILE = "checkpoint.txt";
+    private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp";
+    private final String repoUri;
+    private final String repoDir;
+    private final String branchName;
+    private Git git;
+    private String lastProcessedGitHash;
+    private String latestGitHash;
+
+    /**
+     * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir
+     * @param repoUri URI of repository
+     * @param repoDir Directory to hold the local copy of the repository
+     * @param branchName Branch name
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    private GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException {
+      this.repoUri = repoUri;
+      this.repoDir = repoDir;
+      this.branchName = branchName;
+
+      initRepository();
+    }
+
+    /**
+     * Open the repository if it exists locally, otherwise clone it
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    private void initRepository() throws GitAPIException, IOException {
+      File repoDirFile = new File(this.repoDir);
+
+      try {
+        this.git = Git.open(repoDirFile);
+
+        String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url");
+
+        if (!uri.equals(this.repoUri)) {
+          throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri);
+        }
+      } catch (RepositoryNotFoundException e) {
+        // if the repository was not found then clone a new one
+        this.git = Git.cloneRepository()
+            .setDirectory(repoDirFile)
+            .setURI(this.repoUri)
+            .setBranch(this.branchName)
+            .call();
+      }
+
+      try {
+        this.lastProcessedGitHash = readCheckpoint();
+      } catch (FileNotFoundException e) {
+        // if no checkpoint is available then start with the first commit
+        Iterable<RevCommit> logs = git.log().call();
+        RevCommit lastLog = null;
+
+        for (RevCommit log : logs) {
+          lastLog = log;
+        }
+
+        if (lastLog != null) {
+          this.lastProcessedGitHash = lastLog.getName();
+        }
+      }
+
+      this.latestGitHash = this.lastProcessedGitHash;
+    }
+
+    /**
+     * Read the last processed commit githash from the checkpoint file
+     * @return
+     * @throws IOException
+     */
+    private String readCheckpoint() throws IOException {
+      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+      return Files.toString(checkpointFile, Charsets.UTF_8);
+    }
+
+    /**
+     * Write the last processed commit githash to the checkpoint file
+     * @param gitHash
+     * @throws IOException
+     */
+    private void writeCheckpoint(String gitHash) throws IOException {
+      // write to a temporary name then rename to make the operation atomic when the file system allows a file to be
+      // replaced
+      File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP);
+      File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+
+      Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8);
+
+      Files.move(tmpCheckpointFile, checkpointFile);
+    }
+
+    private void moveCheckpointAndHashesForward() throws IOException {
+      this.lastProcessedGitHash = this.latestGitHash;
+
+      writeCheckpoint(this.latestGitHash);
+    }
+
+    /**
+     *
+     * @throws GitAPIException
+     * @throws IOException
+     */
+    private List<DiffEntry> getChanges() throws GitAPIException, IOException {
+      // get tree for last processed commit
+      ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
+
+      // refresh to latest and reset hard to handle forced pushes
+      this.git.fetch().setRemote(REMOTE_NAME).call();
+      // reset hard to get a clean working set since pull --rebase may leave files around
+      this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call();
+
+      ObjectId head = this.git.getRepository().resolve("HEAD");
+      ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}");
+
+      // remember the hash for the current HEAD. This will be checkpointed after the diff is processed.
+      latestGitHash = head.getName();
+
+      // diff old and new heads to find changes
+      ObjectReader reader = this.git.getRepository().newObjectReader();
+      CanonicalTreeParser oldTreeIter = new CanonicalTreeParser();
+      oldTreeIter.reset(reader, oldHeadTree);
+      CanonicalTreeParser newTreeIter = new CanonicalTreeParser();
+      newTreeIter.reset(reader, headTree);
+
+      return this.git.diff()
+          .setNewTree(newTreeIter)
+          .setOldTree(oldTreeIter)
+          .setShowNameAndStatusOnly(true)
+          .call();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 6aed51e..aebebdc 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -113,6 +113,7 @@ public class GobblinServiceManager implements ApplicationLauncher {
   protected final boolean isSchedulerEnabled;
   protected final boolean isRestLIServerEnabled;
   protected final boolean isTopologySpecFactoryEnabled;
+  protected final boolean isGitConfigMonitorEnabled;
 
   protected TopologyCatalog topologyCatalog;
   @Getter
@@ -127,6 +128,8 @@ public class GobblinServiceManager implements ApplicationLauncher {
 
   protected ClassAliasResolver<TopologySpecFactory> aliasResolver;
 
+  protected GitConfigMonitor gitConfigMonitor;
+
   @Getter
   protected Config config;
 
@@ -161,6 +164,16 @@ public class GobblinServiceManager implements ApplicationLauncher {
     if (isFlowCatalogEnabled) {
       this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER));
       this.serviceLauncher.addService(flowCatalog);
+
+      this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
+          ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
+
+      if (this.isGitConfigMonitorEnabled) {
+        this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
+        this.serviceLauncher.addService(this.gitConfigMonitor);
+      }
+    } else {
+      this.isGitConfigMonitorEnabled = false;
     }
 
     // Initialize Helix
@@ -275,6 +288,10 @@ public class GobblinServiceManager implements ApplicationLauncher {
         LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
         this.scheduler.setActive(true);
       }
+
+      if (this.isGitConfigMonitorEnabled) {
+        this.gitConfigMonitor.setActive(true);
+      }
     } else if (this.helixManager.isPresent()) {
       LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
@@ -282,6 +299,10 @@ public class GobblinServiceManager implements ApplicationLauncher {
         LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
         this.scheduler.setActive(false);
       }
+
+      if (this.isGitConfigMonitorEnabled) {
+        this.gitConfigMonitor.setActive(false);
+      }
     }
   }
 
@@ -311,6 +332,10 @@ public class GobblinServiceManager implements ApplicationLauncher {
           LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler.");
           this.scheduler.setActive(true);
         }
+
+        if (this.isGitConfigMonitorEnabled) {
+          this.gitConfigMonitor.setActive(true);
+        }
       } else {
         if (this.isSchedulerEnabled) {
           LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler.");
@@ -321,6 +346,10 @@ public class GobblinServiceManager implements ApplicationLauncher {
       // .. designate scheduler to itself
       LOGGER.info("[Init] Gobblin Service is running in single instance mode, enabling Scheduler.");
       this.scheduler.setActive(true);
+
+      if (this.isGitConfigMonitorEnabled) {
+        this.gitConfigMonitor.setActive(true);
+      }
     }
 
     // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
new file mode 100644
index 0000000..6cd8bf2
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.ResetCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+public class GitConfigMonitorTest {
+  private static final Logger logger = LoggerFactory.getLogger(GitConfigMonitorTest.class);
+  private Repository remoteRepo;
+  private Git gitForPush;
+  private static final String TEST_DIR = "/tmp/gitConfigTestDir/";
+  private final File remoteDir = new File(TEST_DIR + "/remote");
+  private final File cloneDir = new File(TEST_DIR + "/clone");
+  private final File configDir = new File(cloneDir, "/gobblin-config");
+  private static final String TEST_FLOW_FILE = "testFlow.pull";
+  private static final String TEST_FLOW_FILE2 = "testFlow2.pull";
+  private static final String TEST_FLOW_FILE3 = "testFlow3.pull";
+  private final File testGroupDir = new File(configDir, "testGroup");
+  private final File testFlowFile = new File(testGroupDir, TEST_FLOW_FILE);
+  private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2);
+  private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3);
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private FlowCatalog flowCatalog;
+  private Config config;
+  private GitConfigMonitor gitConfigMonitor;
+
+
+  @BeforeClass
+  public void setup() throws Exception {
+    cleanUpDir(TEST_DIR);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
+    this.remoteRepo = fileKey.open(false);
+    this.remoteRepo.create(true);
+
+    this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+    // push an empty commit as a base for detecting changes
+    this.gitForPush.commit().setMessage("First commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
+        .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
+        .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5)
+        .build();
+
+    this.flowCatalog = new FlowCatalog(config);
+    this.flowCatalog.startAsync().awaitRunning();
+    this.gitConfigMonitor = new GitConfigMonitor(this.config, this.flowCatalog);
+    this.gitConfigMonitor.setActive(true);
+  }
+
+  private void cleanUpDir(String dir) {
+    File specStoreDir = new File(dir);
+
+    // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (specStoreDir.exists()) {
+          FileUtils.deleteDirectory(specStoreDir);
+        }
+        // if delete succeeded then break out of loop
+        break;
+      } catch (IOException e) {
+        logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+      }
+    }
+  }
+
+  @AfterClass
+  public void cleanUp() {
+    if (this.flowCatalog != null) {
+      this.flowCatalog.stopAsync().awaitTerminated();
+    }
+
+    cleanUpDir(TEST_DIR);
+  }
+
+  private String formConfigFilePath(String groupDir, String fileName) {
+    return this.configDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  @Test
+  public void testAddConfig() throws IOException, GitAPIException, URISyntaxException {
+    // push a new config file
+    this.testGroupDir.mkdirs();
+    this.testFlowFile.createNewFile();
+    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value1\n", testFlowFile, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName()))
+        .call();
+    this.gitForPush.commit().setMessage("Second commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+
+    Collection<Spec> specs = this.flowCatalog.getSpecs();
+
+    Assert.assertTrue(specs.size() == 1);
+    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value1");
+  }
+
+  @Test(dependsOnMethods = "testAddConfig")
+  public void testUpdateConfig() throws IOException, GitAPIException, URISyntaxException {
+    // push an updated config file
+    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value2\n", testFlowFile, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName()))
+        .call();
+    this.gitForPush.commit().setMessage("Third commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+
+    Collection<Spec> specs = this.flowCatalog.getSpecs();
+
+    Assert.assertTrue(specs.size() == 1);
+    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value2");
+  }
+
+  @Test(dependsOnMethods = "testUpdateConfig")
+  public void testDeleteConfig() throws IOException, GitAPIException, URISyntaxException {
+    // delete a config file
+    testFlowFile.delete();
+
+    // flow catalog has 1 entry before the config is deleted
+    Collection<Spec> specs = this.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 1);
+
+    // add, commit, push
+    DirCache ac = this.gitForPush.rm().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName()))
+        .call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Fourth commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+
+    specs = this.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 0);
+  }
+
+  @Test(dependsOnMethods = "testDeleteConfig")
+  public void testForcedPushConfig() throws IOException, GitAPIException, URISyntaxException {
+    // push a new config file
+    this.testGroupDir.mkdirs();
+    this.testFlowFile.createNewFile();
+    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value1\n", testFlowFile, Charsets.UTF_8);
+    this.testFlowFile2.createNewFile();
+    Files.write("flow.name=testFlow2\nflow.group=testGroup\nparam1=value2\n", testFlowFile2, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName()))
+        .call();
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile2.getName()))
+        .call();
+    this.gitForPush.commit().setMessage("Fifth commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+
+    Collection<Spec> specs = this.flowCatalog.getSpecs();
+
+    Assert.assertTrue(specs.size() == 2);
+    List<Spec> specList = Lists.newArrayList(specs);
+    specList.sort(new Comparator<Spec>() {
+      @Override
+      public int compare(Spec o1, Spec o2) {
+        return o1.getUri().compareTo(o2.getUri());
+      }
+    });
+
+    FlowSpec spec = (FlowSpec)specList.get(0);
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value1");
+
+    spec = (FlowSpec)specList.get(1);
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value2");
+
+    // go back in time to cause conflict
+    this.gitForPush.reset().setMode(ResetCommand.ResetType.HARD).setRef("HEAD~1").call();
+    this.gitForPush.push().setForce(true).setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    // add new files
+    this.testGroupDir.mkdirs();
+    this.testFlowFile2.createNewFile();
+    Files.write("flow.name=testFlow2\nflow.group=testGroup\nparam1=value4\n", testFlowFile2, Charsets.UTF_8);
+    this.testFlowFile3.createNewFile();
+    Files.write("flow.name=testFlow3\nflow.group=testGroup\nparam1=value5\n", testFlowFile3, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile2.getName()))
+        .call();
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile3.getName()))
+        .call();
+    this.gitForPush.commit().setMessage("Sixth commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+
+    specs = this.flowCatalog.getSpecs();
+
+    Assert.assertTrue(specs.size() == 2);
+
+    specList = Lists.newArrayList(specs);
+    specList.sort(new Comparator<Spec>() {
+      @Override
+      public int compare(Spec o1, Spec o2) {
+        return o1.getUri().compareTo(o2.getUri());
+      }
+    });
+
+    spec = (FlowSpec)specList.get(0);
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value4");
+
+    spec = (FlowSpec)specList.get(1);
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow3"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow3");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value5");
+
+    // reset for next test case
+    this.gitForPush.reset().setMode(ResetCommand.ResetType.HARD).setRef("HEAD~4").call();
+    this.gitForPush.push().setForce(true).setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitConfigMonitor.processGitConfigChanges();
+    specs = this.flowCatalog.getSpecs();
+
+    Assert.assertTrue(specs.size() == 0);
+  }
+
+  @Test(dependsOnMethods = "testForcedPushConfig")
+  public void testPollingConfig() throws IOException, GitAPIException, URISyntaxException, InterruptedException {
+    // push a new config file
+    this.testGroupDir.mkdirs();
+    this.testFlowFile.createNewFile();
+    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern(formConfigFilePath(this.testGroupDir.getName(), this.testFlowFile.getName()))
+        .call();
+    this.gitForPush.commit().setMessage("Seventh commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    Collection<Spec> specs = this.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 0);
+
+    this.gitConfigMonitor.startAsync().awaitRunning();
+
+    // polling is every 5 seconds, so wait twice as long and check
+    TimeUnit.SECONDS.sleep(10);
+
+    specs = this.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 1);
+
+    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value20");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 6378339..314dc66 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -17,15 +17,20 @@
 
 package org.apache.gobblin.service.modules.core;
 
-import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.Schedule;
 import java.io.File;
+import java.net.URI;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -33,8 +38,10 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.linkedin.data.template.StringMap;
@@ -42,15 +49,12 @@ import com.linkedin.restli.client.RestLiResponseException;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -65,6 +69,9 @@ public class GobblinServiceManagerTest {
   private static final String SPEC_VERSION = "1";
   private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore";
   private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore";
+  private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone";
+  private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
+  private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
 
   private static final String TEST_GROUP_NAME = "testGroup1";
   private static final String TEST_FLOW_NAME = "testFlow1";
@@ -76,18 +83,11 @@ public class GobblinServiceManagerTest {
   private static final String TEST_SOURCE_NAME = "testSource";
   private static final String TEST_SINK_NAME = "testSink";
 
-  private ServiceBasedAppLauncher serviceLauncher;
-  private TopologyCatalog topologyCatalog;
-  private TopologySpec topologySpec;
-
-  private FlowCatalog flowCatalog;
-  private FlowSpec flowSpec;
-
-  private Orchestrator orchestrator;
-
   private GobblinServiceManager gobblinServiceManager;
   private FlowConfigClient flowConfigClient;
 
+  private Git gitForPush;
+
   @BeforeClass
   public void setup() throws Exception {
     cleanUpDir(SERVICE_WORK_DIR);
@@ -108,6 +108,21 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX +  TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities",
         TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
 
+    serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true);
+    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
+    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
+    serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
+    fileKey.open(false).create(true);
+
+    this.gitForPush = Git.cloneRepository().setURI(GIT_REMOTE_REPO_DIR).setDirectory(new File(GIT_CLONE_DIR)).call();
+
+    // push an empty commit as a base for detecting changes
+    this.gitForPush.commit().setMessage("First commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
+
     this.gobblinServiceManager = new GobblinServiceManager("CoreService", "1",
         ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
     this.gobblinServiceManager.start();
@@ -247,6 +262,35 @@ public class GobblinServiceManagerTest {
     Assert.fail("Get should have gotten a 404 error");
   }
 
+  @Test (dependsOnMethods = "testDelete")
+  public void testGitCreate() throws Exception {
+    // push a new config file
+    File testFlowFile = new File(GIT_CLONE_DIR + "/gobblin-config/testGroup/testFlow.pull");
+    testFlowFile.getParentFile().mkdirs();
+
+    Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
+
+    Collection<Spec> specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 0);
+
+    // add, commit, push
+    this.gitForPush.add().addFilepattern("gobblin-config/testGroup/testFlow.pull").call();
+    this.gitForPush.commit().setMessage("second commit").call();
+    this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
+
+    // polling is every 5 seconds, so wait twice as long and check
+    TimeUnit.SECONDS.sleep(10);
+
+    specs = this.gobblinServiceManager.flowCatalog.getSpecs();
+    Assert.assertTrue(specs.size() == 1);
+
+    FlowSpec spec = (FlowSpec)(specs.iterator().next());
+    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
+    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
+    Assert.assertEquals(spec.getConfig().getString("param1"), "value20");
+  }
+
   @Test
   public void testBadGet() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/da382dbf/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index d100ef2..41e1485 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -71,6 +71,7 @@ ext.externalDependency = [
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.4",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
+    "jgit":"org.eclipse.jgit:org.eclipse.jgit:4.8.0.201706111038-r",
     "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version,
     "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test",
     "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,


Mime
View raw message