Return-Path: X-Original-To: apmail-subversion-commits-archive@minotaur.apache.org Delivered-To: apmail-subversion-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF393EC3E for ; Mon, 14 Jan 2013 01:17:20 +0000 (UTC) Received: (qmail 60628 invoked by uid 500); 14 Jan 2013 01:17:20 -0000 Delivered-To: apmail-subversion-commits-archive@subversion.apache.org Received: (qmail 60612 invoked by uid 500); 14 Jan 2013 01:17:20 -0000 Mailing-List: contact commits-help@subversion.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@subversion.apache.org Delivered-To: mailing list commits@subversion.apache.org Received: (qmail 60604 invoked by uid 99); 14 Jan 2013 01:17:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jan 2013 01:17:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,NORMAL_HTTP_TO_IP,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jan 2013 01:17:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 40EC2238896F; Mon, 14 Jan 2013 01:16:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1432778 - in /subversion/trunk/tools/server-side/svnpubsub: README.txt commit-hook.py daemonize.py example.conf irkerbridge.py notes/ svnpubsub/client.py svnpubsub/server.py svnwcsub.conf.example svnwcsub.py test.conf testserver.py watcher.py Date: Mon, 14 Jan 2013 01:16:57 -0000 To: commits@subversion.apache.org From: breser@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130114011658.40EC2238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: breser Date: Mon Jan 14 01:16:57 2013 New Revision: 1432778 URL: http://svn.apache.org/viewvc?rev=1432778&view=rev Log: Rework svnpubsub to reduce a lot of complexity and generalize. In particular the following significant changes have been made: - XML support has been removed. - svnpubsub.client now takes URLs instead of host/port tuples. - svnpubsub.client has been updated to use JSON. - The JSON stream when connecting to the server no longer starts an object that never gets finished. - The server provides a JSON object with the key svnpubsub that contains a version key. This is intended to identify the format the server will be sending, it can be used to make clients backwards compatable to protocol changes. - Each JSON record is separated with a null character. - The input JSON is now identical to the object in the commit field of the output object. - A type and format field have been added to the JSON object. type can be used to distinguish say a git commit from a svn commit. format is a integer format similar to what SVN uses all over the place. - The following fields have been renamed to allow the same fields to be used across version control systems: revision -> id, author -> committer (author could be used for what other systems use author for), repos -> repository. - The following fields are required by the server on the input: repository, id, type and format, everything else is optional. - The dirs_changed field has been removed, it can be calculated from changed. - The Revision classes have been renamed Commit and in general no longer hard code certain fields. - There is only one type of output stream now, the dirs-changed output type has been removed. - The URLs used by the server have changed. All PUTs go to /commits. Clients no longer pass a format since the only format is JSON. Clients can still filter by repository, but can also filter by type now. [in tools/server-side/svnpubsub] * svnpubsub/server.py (*): Update documentation to reflect changes. (twisted.web.static, xml.etree): No longer used don't import. (Commit): Remove and replace with... (Revision): which has very little in the way of requirements on the incoming JSON. (Client.init): Remove format and add type as a filter. (Client.interested_in): Handle both repository and type. Takes the commit object rather than the uuid as input now. (Client.write_data): Data is just a string rather than a dictionary since there is only one format JSON and records are separated by a null character. (JSONClient): Merged into Client since only JSON is supported. (Client.write_start): Start with a svnpubsub object that has a version field rather than starting a commits object we never close. (Client.write_heartbeat): Separate records with null character. (XMLClient): Removed. (SvnPubSub.clients): Is just a list not a dictionary of clients by type. (SvnPubSub.cc, SvnPubSub.remove): Adjust for clients being just a list. (SvnPubSub.render_GET): Adjust for new URI format and single format. (SvnPubSub.notifyAll): Adjust for single format, move log formatting to new Commit class and adjust for changes in Client.interested_in(). (SvnPubSub.render_PUT): Handle format errors and object validation. Also use the new Commit class. (svnpubsub_server): Don't reference /dev/null as empty resource just create an empty Resource. Remove dirs-changed and commit URI paths. * svnpubsub/client.py (urlparse): Adjust import to deal with Python 3 rename. (xml.sax): No longer import, since we no longer support XML. (json): New import. (Client.__init__): Accept a URL instead of host and port. Use JSON instead of XML. Deal with bytes vs strings for Python 3. (Client.handle_connect, Client.handle_close, Client.handle_error): Pass None for the new event_arg parameter on the event_callback. (Client.found_terminator): bytes vs strings, Use \0 as the record separator and let async_chat split the records for us. (Client.collect_incoming_data): Just buffer incoming data, when the separator is found it'll call the parser from found_terminator(). (XMLStreamHandler, Revision): Remove (JSONRecordHandler, Commit): Add (MultiClient.__init__): Take a list of urls instead of hostports. (MultiClient._reconnect, MultiClient._reconnect_later, MultiClient._add_channel, MultiClient._check_stale): Use a URL instead of host and port as incoming arguments and on callbacks. Add event_arg to the event_callbacks. * README.txt: Remove svnwcsub todos (moved to svnwcsub.py) and update example. * commit-hook.py (svncmd_dirs): Remove function (do_put): PUT to /commits instead of /dirs-changed (main): Change format of posted JSON to match changes to the server, remove debug print. * daemonize.py (daemonize_exit, daemonize): Change to the newer/clearer except Foo as e syntax. Makes this Python 3 compatible. * example.conf, * svnwcsub.conf.example: Rename example.conf to svnwcsub.conf.example. Fix the example stream and fix the interpolation syntax. * irkerbridge.py (*): Update documentation to reflect changes. (Daemon.run, BigDoEverythingClass.__init__): Give MultiClient a list of urls instead of list of host port tuples. (BigDoEverythingClass.locate_matching_configs, BigDoEverythingClass.fill_in_extra_args, BigDoEverythingClass.commit): Commit callback gets a url and commit object instead of host, port and Revision object. (BigDoEverythingClass._generate_dirs_changed): New function to calculate the dirs_changed field from the new changed field. (BigDoEverythingClass.fill_in_extra_args): Use _generate_dirs_changed() and fix a bug where dirs_root was an empty string. (BigDoEverythignClass.event): Event receives a url, event_name and event_arg now. * svnwcsub.py (*): Moved TODOs from README.txt here. (ConfigParser, Queue, urlparse): Adjust import for Python 3 rename. (posixpath): New import. (BigDoEverythingClasss.__init__): svnpubsub.client uses urls directly now. (BigDoEverythingClasss._normalize_path, BigDoEverythingClasss.commit): Use posixpath instead of os.path when working on fspaths. (BigDoEverythingClasss.commit, Daemon._event): Adjust for changed callback params. (Daemon.run): Use a list of urls instead of a list of hostport tuples. * test.conf: Delete * testserver.py (TEST_BODY): Change XML to JSON. * watcher.py (urlparse): Adjust import for Python 3 rename. (_commit, _event): Adjust for callback parameter changes. (main): svnpubsub.client uses URLs now instead of hostports. (*): Take a list of URLs on the command line instead of pulling in svnwcsub and using its config class. Added: subversion/trunk/tools/server-side/svnpubsub/notes/ subversion/trunk/tools/server-side/svnpubsub/svnwcsub.conf.example - copied, changed from r1428993, subversion/trunk/tools/server-side/svnpubsub/example.conf Removed: subversion/trunk/tools/server-side/svnpubsub/example.conf subversion/trunk/tools/server-side/svnpubsub/test.conf Modified: subversion/trunk/tools/server-side/svnpubsub/README.txt subversion/trunk/tools/server-side/svnpubsub/commit-hook.py subversion/trunk/tools/server-side/svnpubsub/daemonize.py subversion/trunk/tools/server-side/svnpubsub/irkerbridge.py subversion/trunk/tools/server-side/svnpubsub/svnpubsub/client.py subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py subversion/trunk/tools/server-side/svnpubsub/testserver.py subversion/trunk/tools/server-side/svnpubsub/watcher.py Modified: subversion/trunk/tools/server-side/svnpubsub/README.txt URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/README.txt?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/README.txt (original) +++ subversion/trunk/tools/server-side/svnpubsub/README.txt Mon Jan 14 01:16:57 2013 @@ -1,21 +1,3 @@ -### write a README - - -TODO: -- bulk update at startup time to avoid backlog warnings -- switch to host:port format in config file -- fold BDEC into Daemon -- fold WorkingCopy._get_match() into __init__ -- remove wc_ready(). assume all WorkingCopy instances are usable. - place the instances into .watch at creation. the .update_applies() - just returns if the wc is disabled (eg. could not find wc dir) -- figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER - (a base path exclusion list should work for the ASF) -- add support for SIGHUP to reread the config and reinitialize working copies -- joes will write documentation for svnpubsub as these items become fulfilled -- make LOGLEVEL configurable - - Installation instructions: 1. Set up an svnpubsub service. @@ -39,4 +21,4 @@ Installation instructions: 3. Set up svnpubsub clients. (eg svnwcsub.py, svnpubsub/client.py, - 'curl -i http://${hostname}:2069/commits/json') + 'curl -sN http://${hostname}:2069/commits') Modified: subversion/trunk/tools/server-side/svnpubsub/commit-hook.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/commit-hook.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/commit-hook.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/commit-hook.py Mon Jan 14 01:16:57 2013 @@ -48,17 +48,6 @@ def svncmd_info(repo, revision): 'date': data[1].strip(), 'log': "\n".join(data[3:]).strip()} -def svncmd_dirs(repo, revision): - cmd = "%s dirs-changed -r %s %s" % (SVNLOOK, revision, repo) - p = svncmd(cmd) - dirs = [] - while True: - line = p.stdout.readline() - if not line: - break - dirs.append(line.strip()) - return dirs - def svncmd_changed(repo, revision): cmd = "%s changed -r %s %s" % (SVNLOOK, revision, repo) p = svncmd(cmd) @@ -74,7 +63,7 @@ def svncmd_changed(repo, revision): def do_put(body): opener = urllib2.build_opener(urllib2.HTTPHandler) - request = urllib2.Request("http://%s:%d/dirs-changed" %(HOST, PORT), data=body) + request = urllib2.Request("http://%s:%d/commits" %(HOST, PORT), data=body) request.add_header('Content-Type', 'application/json') request.get_method = lambda: 'PUT' url = opener.open(request) @@ -83,18 +72,17 @@ def do_put(body): def main(repo, revision): revision = revision.lstrip('r') i = svncmd_info(repo, revision) - data = {'revision': int(revision), - 'dirs_changed': [], + data = {'type': 'svn', + 'format': 1, + 'id': int(revision), 'changed': {}, - 'repos': svncmd_uuid(repo), - 'author': i['author'], + 'repository': svncmd_uuid(repo), + 'committer': i['author'], 'log': i['log'], 'date': i['date'], } - data['dirs_changed'].extend(svncmd_dirs(repo, revision)) data['changed'].update(svncmd_changed(repo, revision)) body = json.dumps(data) - #print body do_put(body) if __name__ == "__main__": Modified: subversion/trunk/tools/server-side/svnpubsub/daemonize.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/daemonize.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/daemonize.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/daemonize.py Mon Jan 14 01:16:57 2013 @@ -50,11 +50,11 @@ class Daemon(object): def daemonize_exit(self): try: result = self.daemonize() - except (ChildFailed, DaemonFailed), e: + except (ChildFailed, DaemonFailed) as e: # duplicate the exit code sys.exit(e.code) except (ChildTerminatedAbnormally, ChildForkFailed, - DaemonTerminatedAbnormally, DaemonForkFailed), e: + DaemonTerminatedAbnormally, DaemonForkFailed) as e: sys.stderr.write('ERROR: %s\n' % e) sys.exit(1) except ChildResumedIncorrectly: @@ -74,7 +74,7 @@ class Daemon(object): # fork off a child that can detach itself from this process. try: pid = os.fork() - except OSError, e: + except OSError as e: raise ChildForkFailed(e.errno, e.strerror) if pid > 0: @@ -113,7 +113,7 @@ class Daemon(object): # perform the second fork try: pid = os.fork() - except OSError, e: + except OSError as e: raise DaemonForkFailed(e.errno, e.strerror) if pid > 0: Modified: subversion/trunk/tools/server-side/svnpubsub/irkerbridge.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/irkerbridge.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/irkerbridge.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/irkerbridge.py Mon Jan 14 01:16:57 2013 @@ -30,8 +30,6 @@ # Space separated list of URLs to streams. # This option should only be in the DEFAULT section, is ignored in # all other sections. -# NOTE: At current svnpubsub.client only accepts hostname and port -# combos so the path is ignored and /commits/xml is used. # irker=hostname:port # The hostname/port combination of the irker daemon. If port is # omitted it defaults to 6659. Irker is connected to over UDP. @@ -46,7 +44,7 @@ # template=string # A string to use to format the output. The string is a Python # string Template. The following variables are available: -# $author, $rev, $date, $uuid, $log, $log, $log_firstline, +# $committer, $id, $date, $repository, $log, $log_firstline, # $log_firstparagraph, $dirs_changed, $dirs_count, $dirs_count_s, # $subdirs_count, $subdirs_count_s, $dirs_root # Most of them should be self explanatory. $dirs_count is the number of @@ -114,9 +112,9 @@ class Daemon(daemonize.Daemon): def run(self): print 'irkerbridge started, pid=%d' % (os.getpid()) - mc = svnpubsub.client.MultiClient(self.bdec.hostports, - self.bdec.commit, - self.bdec.event) + mc = svnpubsub.client.MultiClient(self.bdec.urls, + self.bdec.commit, + self.bdec.event) mc.run_forever() @@ -124,12 +122,9 @@ class BigDoEverythingClass(object): def __init__(self, config, options): self.config = config self.options = options - self.hostports = [] - for url in config.get_value('streams').split(): - parsed = urlparse.urlparse(url.strip()) - self.hostports.append((parsed.hostname, parsed.port or 80)) + self.urls = config.get_value('streams').split() - def locate_matching_configs(self, rev): + def locate_matching_configs(self, commit): result = [ ] for section in self.config.sections(): match = self.config.get(section, "match").split('/', 1) @@ -137,40 +132,62 @@ class BigDoEverythingClass(object): # No slash so assume all paths match.append('*') match_uuid, match_path = match - if rev.uuid == match_uuid or match_uuid == "*": - for path in rev.dirs_changed: + if commit.repository == match_uuid or match_uuid == "*": + for path in commit.changed: if fnmatch.fnmatch(path, match_path): result.append(section) break return result - def fill_in_extra_args(self, rev): + def _generate_dirs_changed(self, commit): + if hasattr(commit, 'dirs_changed') or not hasattr(commit, 'changed'): + return + + dirs_changed = set() + for p in commit.changed: + if p[-1] == '/' and commit.changed[p]['flags'][1] == 'U': + # directory with property changes add the directory itself. + dirs_changed.add(p) + else: + # everything else add the parent of the path + # directories have a trailing slash so if it's present remove + # it before finding the parent. The result will be a directory + # so it needs a trailing slash + dirs_changed.add(posixpath.dirname(p.rstrip('/')) + '/') + + commit.dirs_changed = dirs_changed + return + + def fill_in_extra_args(self, commit): # Set any empty members to the string "" - v = vars(rev) + v = vars(commit) for k in v.keys(): if not v[k]: v[k] = '' - - # Add entries to the rev object that are useful for + + self._generate_dirs_changed(commit) + # Add entries to the commit object that are useful for # formatting. - rev.log_firstline = rev.log.split("\n",1)[0] - rev.log_firstparagraph = re.split("\r?\n\r?\n",rev.log,1)[0] - rev.log_firstparagraph = re.sub("\r?\n"," ",rev.log_firstparagraph) - if rev.dirs_changed: - rev.dirs_root = posixpath.commonprefix(rev.dirs_changed) - rev.dirs_count = len(rev.dirs_changed) - if rev.dirs_count > 1: - rev.dirs_count_s = " (%d dirs)" %(rev.dirs_count) + commit.log_firstline = commit.log.split("\n",1)[0] + commit.log_firstparagraph = re.split("\r?\n\r?\n",commit.log,1)[0] + commit.log_firstparagraph = re.sub("\r?\n"," ",commit.log_firstparagraph) + if commit.dirs_changed: + commit.dirs_root = posixpath.commonprefix(commit.dirs_changed) + if commit.dirs_root == '': + commit.dirs_root = '/' + commit.dirs_count = len(commit.dirs_changed) + if commit.dirs_count > 1: + commit.dirs_count_s = " (%d dirs)" %(commit.dirs_count) else: - rev.dirs_count_s = "" + commit.dirs_count_s = "" - rev.subdirs_count = rev.dirs_count - if rev.dirs_root in rev.dirs_changed: - rev.subdirs_count -= 1 - if rev.subdirs_count > 1: - rev.subdirs_count_s = " + %d subdirs" % (rev.subdirs_count) + commit.subdirs_count = commit.dirs_count + if commit.dirs_root in commit.dirs_changed: + commit.subdirs_count -= 1 + if commit.subdirs_count >= 1: + commit.subdirs_count_s = " + %d subdirs" % (commit.subdirs_count) else: - rev.subdirs_count_s = "" + commit.subdirs_count_s = "" def _send(self, irker, msg): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -193,22 +210,22 @@ class BigDoEverythingClass(object): msg = {'to': to, 'privmsg': ''} self._send(irker, msg) - def commit(self, host, port, rev): + def commit(self, url, commit): if self.options.verbose: - print "RECV: from %s:%s" % (host, port) - print json.dumps(vars(rev), indent=2) + print "RECV: from %s" % url + print json.dumps(vars(commit), indent=2) try: - config_sections = self.locate_matching_configs(rev) + config_sections = self.locate_matching_configs(commit) if len(config_sections) > 0: - self.fill_in_extra_args(rev) + self.fill_in_extra_args(commit) for section in config_sections: irker = self.config.get(section, "irker") to_list = self.config.get(section, "to").split() template = self.config.get(section, "template") if not irker or not to_list or not template: continue - privmsg = Template(template).safe_substitute(vars(rev)) + privmsg = Template(template).safe_substitute(vars(commit)) if len(privmsg) > MAX_PRIVMSG: privmsg = privmsg[:MAX_PRIVMSG-3] + '...' for to in to_list: @@ -221,9 +238,9 @@ class BigDoEverythingClass(object): sys.stdout.flush() raise - def event(self, host, port, event_name): + def event(self, url, event_name, event_arg): if self.options.verbose or event_name != "ping": - print 'EVENT: %s from %s:%s' % (event_name, host, port) + print 'EVENT: %s from %s' % (event_name, url) sys.stdout.flush() Modified: subversion/trunk/tools/server-side/svnpubsub/svnpubsub/client.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/svnpubsub/client.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/svnpubsub/client.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/svnpubsub/client.py Mon Jan 14 01:16:57 2013 @@ -38,7 +38,11 @@ import asynchat import socket import functools import time -import xml.sax +import json +try: + import urlparse +except ImportError: + import urllib.parse as urlparse # How long the polling loop should wait for activity before returning. TIMEOUT = 30.0 @@ -55,22 +59,30 @@ STALE_DELAY = 60.0 class Client(asynchat.async_chat): - def __init__(self, host, port, commit_callback, event_callback): + def __init__(self, url, commit_callback, event_callback): asynchat.async_chat.__init__(self) self.last_activity = time.time() + self.ibuffer = [] - self.host = host - self.port = port - self.event_callback = event_callback + self.url = url + parsed_url = urlparse.urlsplit(url) + if parsed_url.scheme != 'http': + raise ValueError("URL scheme must be http: '%s'" % url) + host = parsed_url.hostname + port = parsed_url.port + resource = parsed_url.path + if parsed_url.query: + resource += "?%s" % parsed_url.query + if parsed_url.fragment: + resource += "#%s" % parsed_url.fragment - handler = XMLStreamHandler(commit_callback, event_callback) + self.event_callback = event_callback - self.parser = xml.sax.make_parser(['xml.sax.expatreader']) - self.parser.setContentHandler(handler) + self.parser = JSONRecordHandler(commit_callback, event_callback) - # Wait for the end of headers. Then we start parsing XML. - self.set_terminator('\r\n\r\n') + # Wait for the end of headers. Then we start parsing JSON. + self.set_terminator(b'\r\n\r\n') self.skipping_headers = True self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -79,101 +91,61 @@ class Client(asynchat.async_chat): except: self.handle_error() return - - ### should we allow for repository restrictions? - self.push('GET /commits/xml HTTP/1.0\r\n\r\n') + + self.push(('GET %s HTTP/1.0\r\n\r\n' % resource).encode('ascii')) def handle_connect(self): - self.event_callback('connected') + self.event_callback('connected', None) def handle_close(self): - self.event_callback('closed') + self.event_callback('closed', None) self.close() def handle_error(self): - self.event_callback('error') + self.event_callback('error', None) self.close() def found_terminator(self): - self.skipping_headers = False - - # From here on, collect everything. Never look for a terminator. - self.set_terminator(None) + if self.skipping_headers: + self.skipping_headers = False + # Each JSON record is terminated by a null character + self.set_terminator(b'\0') + else: + record = b"".join(self.ibuffer) + self.ibuffer = [] + self.parser.feed(record.decode()) def collect_incoming_data(self, data): # Remember the last time we saw activity self.last_activity = time.time() if not self.skipping_headers: - # Just shove this into the XML parser. As the elements are processed, - # we'll collect them into an appropriate structure, and then invoke - # the callback when we have fully received a commit. - self.parser.feed(data) + self.ibuffer.append(data) -class XMLStreamHandler(xml.sax.handler.ContentHandler): - +class JSONRecordHandler: def __init__(self, commit_callback, event_callback): self.commit_callback = commit_callback self.event_callback = event_callback - self.rev = None - self.chars = '' - self.parent = None - self.attrs = [ ] - - def startElement(self, name, attrs): - self.attrs = attrs - if name == 'commit': - self.rev = Revision(attrs['repository'], int(attrs['revision'])) - elif name == "dirs_changed" or name == "changed": - self.parent = name - # No other elements to worry about. - - def characters(self, data): - self.chars += data - - def endElement(self, name): - if name == 'commit': - self.commit_callback(self.rev) - self.rev = None - elif name == 'stillalive': - self.event_callback('ping') - elif name == self.parent: - self.parent = None - elif self.chars and self.rev: - value = self.chars.strip() - if self.parent == 'dirs_changed' and name == 'path': - self.rev.dirs_changed.append(value.decode('unicode_escape')) - elif self.parent == 'changed' and name == 'path': - path = value.decode('unicode_escape') - self.rev.changed[path] = dict(p for p in self.attrs.items()) - elif name == 'author': - self.rev.author = value.decode('unicode_escape') - elif name == 'date': - self.rev.date = value.decode('unicode_escape') - elif name == 'log': - self.rev.log = value.decode('unicode_escape') - - # Toss out any accumulated characters for this element. - self.chars = '' - # Toss out the saved attributes for this element. - self.attrs = [ ] - - -class Revision(object): - def __init__(self, uuid, rev): - self.uuid = uuid - self.rev = rev - self.dirs_changed = [ ] - self.changed = { } - self.author = None - self.date = None - self.log = None + def feed(self, record): + obj = json.loads(record) + if 'svnpubsub' in obj: + self.event_callback('version', obj['svnpubsub']['version']) + elif 'commit' in obj: + commit = Commit(obj['commit']) + self.commit_callback(commit) + elif 'stillalive' in obj: + self.event_callback('ping', obj['stillalive']) + + +class Commit(object): + def __init__(self, commit): + self.__dict__.update(commit) class MultiClient(object): - def __init__(self, hostports, commit_callback, event_callback): + def __init__(self, urls, commit_callback, event_callback): self.commit_callback = commit_callback self.event_callback = event_callback @@ -181,33 +153,33 @@ class MultiClient(object): self.target_time = 0 self.work_items = [ ] - for host, port in hostports: - self._add_channel(host, port) + for url in urls: + self._add_channel(url) - def _reconnect(self, host, port, event_name): + def _reconnect(self, url, event_name, event_arg): if event_name == 'closed' or event_name == 'error': # Stupid connection closed for some reason. Set up a reconnect. Note # that it should have been removed from asyncore.socket_map already. - self._reconnect_later(host, port) + self._reconnect_later(url) # Call the user's callback now. - self.event_callback(host, port, event_name) + self.event_callback(url, event_name, event_arg) - def _reconnect_later(self, host, port): + def _reconnect_later(self, url): # Set up a work item to reconnect in a little while. - self.work_items.append((host, port)) + self.work_items.append(url) # Only set a target if one has not been set yet. Otherwise, we could # create a race condition of continually moving out towards the future if not self.target_time: self.target_time = time.time() + RECONNECT_DELAY - def _add_channel(self, host, port): + def _add_channel(self, url): # Simply instantiating the client will install it into the global map # for processing in the main event loop. - Client(host, port, - functools.partial(self.commit_callback, host, port), - functools.partial(self._reconnect, host, port)) + Client(url, + functools.partial(self.commit_callback, url), + functools.partial(self._reconnect, url)) def _check_stale(self): now = time.time() @@ -215,12 +187,12 @@ class MultiClient(object): if client.last_activity + STALE_DELAY < now: # Whoops. No activity in a while. Signal this fact, Close the # Client, then have it reconnected later on. - self.event_callback(client.host, client.port, 'stale') + self.event_callback(client.url, 'stale', client.last_activity) # This should remove it from .socket_map. client.close() - self._reconnect_later(client.host, client.port) + self._reconnect_later(client.url) def _maybe_work(self): # If we haven't reach the targetted time, or have no work to do, @@ -236,8 +208,8 @@ class MultiClient(object): work = self.work_items self.work_items = [ ] - for host, port in work: - self._add_channel(host, port) + for url in work: + self._add_channel(url) def run_forever(self): while True: Modified: subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/svnpubsub/server.py Mon Jan 14 01:16:57 2013 @@ -28,20 +28,24 @@ # Currently supports both XML and JSON serialization. # # Example Sub clients: -# curl -i http://127.0.0.1:2069/dirs-changed/xml -# curl -i http://127.0.0.1:2069/dirs-changed/json -# curl -i http://127.0.0.1:2069/commits/json -# curl -i http://127.0.0.1:2069/commits/13f79535-47bb-0310-9956-ffa450edef68/json -# curl -i http://127.0.0.1:2069/dirs-changed/13f79535-47bb-0310-9956-ffa450edef68/json +# curl -sN http://127.0.0.1:2069/commits +# curl -sN http://127.0.0.1:2069/commits/svn/* +# curl -sN http://127.0.0.1:2069/commits/svn +# curl -sN http://127.0.0.1:2069/commits/*/13f79535-47bb-0310-9956-ffa450edef68 +# curl -sN http://127.0.0.1:2069/commits/svn/13f79535-47bb-0310-9956-ffa450edef68 # -# URL is built into 3 parts: -# /${type}/${optional_repo_uuid}/${format} +# URL is built into 2 parts: +# /commits/${optional_type}/${optional_repository} +# +# If the type is included in the URL, you will only get commits of that type. +# The type can be * and then you will receive commits of any type. # -# If the repository UUID is included in the URl, you will only receive -# messages about that repository. +# If the repository is included in the URL, you will only receive +# messages about that repository. The repository can be * and then you +# will receive messages about all repositories. # # Example Pub clients: -# curl -T revinfo.json -i http://127.0.0.1:2069/commit +# curl -T revinfo.json -i http://127.0.0.1:2069/commits # # TODO: # - Add Real access controls (not just 127.0.0.1) @@ -61,82 +65,53 @@ import sys import twisted from twisted.internet import reactor from twisted.internet import defer -from twisted.web import server, static +from twisted.web import server from twisted.web import resource from twisted.python import log -try: - from xml.etree import cElementTree as ET -except: - from xml.etree import ElementTree as ET import time -class Revision: +class Commit: def __init__(self, r): - # Don't escape the values; json handles binary values fine. - # ET will happily emit literal control characters (eg, NUL), - # thus creating invalid XML, so the XML code paths do escaping. - self.rev = r.get('revision') - self.repos = r.get('repos') - self.dirs_changed = r.get('dirs_changed') - self.changed = r.get('changed') - self.author = r.get('author') - self.log = r.get('log') - self.date = r.get('date') - - def render_commit(self, format): - if format == "json": - return json.dumps({'commit': {'repository': self.repos, - 'revision': self.rev, - 'dirs_changed': self.dirs_changed, - 'changed': self.changed, - 'author': self.author, - 'log': self.log, - 'date': self.date}}) +"," - elif format == "xml": - c = ET.Element('commit', {'repository': self.repos, 'revision': "%d" % (self.rev)}) - ET.SubElement(c, 'author').text = self.author.encode('unicode_escape') - ET.SubElement(c, 'date').text = self.date.encode('unicode_escape') - ET.SubElement(c, 'log').text = self.log.encode('unicode_escape') - d = ET.SubElement(c, 'dirs_changed') - for p in self.dirs_changed: - x = ET.SubElement(d, 'path') - x.text = p.encode('unicode_escape') - ch = ET.SubElement(c, 'changed') - for chp in self.changed.keys(): - x = ET.SubElement(ch, 'path', self.changed[chp]) - x.text = chp.encode('unicode_escape') - - str = ET.tostring(c, 'UTF-8') + "\n" - return str[39:] - else: - raise Exception("Ooops, invalid format") - - def render_dirs_changed(self, format): - if format == "json": - return json.dumps({'commit': {'repository': self.repos, - 'revision': self.rev, - 'dirs_changed': self.dirs_changed}}) +"," - elif format == "xml": - c = ET.Element('commit', {'repository': self.repos, 'revision': "%d" % (self.rev)}) - d = ET.SubElement(c, 'dirs_changed') - for p in self.dirs_changed: - x = ET.SubElement(d, 'path') - x.text = p.encode('unicode_escape') - str = ET.tostring(c, 'UTF-8') + "\n" - return str[39:] - else: - raise Exception("Ooops, invalid format") + self.__dict__.update(r) + if not self.check_value('repository'): + raise ValueError('Invalid Repository Value') + if not self.check_value('type'): + raise ValueError('Invalid Type Value') + if not self.check_value('format'): + raise ValueError('Invalid Format Value') + if not self.check_value('id'): + raise ValueError('Invalid ID Value') + + def check_value(self, k): + return hasattr(self, k) and self.__dict__[k] + + def render_commit(self): + obj = {'commit': {}} + obj['commit'].update(self.__dict__) + return json.dumps(obj) + + def render_log(self): + try: + paths_changed = " %d paths changed" % len(self.changed) + except: + paths_changed = "" + return "%s:%s repo '%s' id '%s'%s" % (self.type, + self.format, + self.repository, + self.id, + paths_changed) + HEARTBEAT_TIME = 15 class Client(object): - def __init__(self, pubsub, r, repos, fmt): + def __init__(self, pubsub, r, type, repository): self.pubsub = pubsub r.notifyFinish().addErrback(self.finished) self.r = r - self.format = fmt - self.repos = repos + self.type = type + self.repository = repository self.alive = True log.msg("OPEN: %s:%d (%d clients online)"% (r.getClientIP(), r.client.port, pubsub.cc()+1)) @@ -148,12 +123,14 @@ class Client(object): except ValueError: pass - def interested_in(self, uuid): - if self.repos is None: - return True - if uuid == self.repos: - return True - return False + def interested_in(self, commit): + if self.type and self.type != commit.type: + return False + + if self.repository and self.repository != commit.repository: + return False + + return True def notify(self, data): self.write(data) @@ -168,88 +145,67 @@ class Client(object): reactor.callLater(HEARTBEAT_TIME, self.heartbeat, None) def write_data(self, data): - self.write(data[self.format] + "\n") + self.write(data + "\0") """ "Data must not be unicode" is what the interfaces.ITransport says... grr. """ def write(self, input): self.r.write(str(input)) -class JSONClient(Client): def write_start(self): self.r.setHeader('content-type', 'application/json') - self.write('{"commits": [\n') + self.write('{"svnpubsub": {"version": 1}}\0') def write_heartbeat(self): - self.write(json.dumps({"stillalive": time.time()}) + ",\n") + self.write(json.dumps({"stillalive": time.time()}) + "\0") -class XMLClient(Client): - def write_start(self): - self.r.setHeader('content-type', 'application/xml') - self.write("\n") - - def write_heartbeat(self): - self.write("%f\n" % (time.time())) class SvnPubSub(resource.Resource): isLeaf = True - clients = {'commits': [], - 'dirs-changed': []} + clients = [] def cc(self): - return reduce(lambda x,y: len(x)+len(y), self.clients.values()) + return len(self.clients) def remove(self, c): - for k in self.clients.keys(): - self.clients[k].remove(c) + self.clients.remove(c) def render_GET(self, request): log.msg("REQUEST: %s" % (request.uri)) - uri = request.uri.split('/') - request.setHeader('content-type', 'text/plain') - if len(uri) != 3 and len(uri) != 4: - request.setResponseCode(400) - return "Invalid path\n" - uuid = None - fmt = None - type = uri[1] - - if len(uri) == 3: - fmt = uri[2] - else: - fmt = uri[3] - uuid = uri[2] + repository = None + type = None - if type not in self.clients.keys(): + uri = request.uri.split('/') + uri_len = len(uri) + if uri_len < 2 or uri_len > 4: request.setResponseCode(400) - return "Invalid Reuqest Type\n" + return "Invalid path\n" - clients = {'json': JSONClient, 'xml': XMLClient} - clientCls = clients.get(fmt) - if clientCls == None: - request.setResponseCode(400) - return "Invalid Format Requested\n" + if uri_len >= 3: + type = uri[2] + + if uri_len == 4: + repository = uri[3] + + # Convert wild card to None. + if type == '*': + type = None + if repository == '*': + repository = None - c = clientCls(self, request, uuid, fmt) - self.clients[type].append(c) + c = Client(self, request, type, repository) + self.clients.append(c) c.start() return twisted.web.server.NOT_DONE_YET - def notifyAll(self, rev): - data = {'commits': {}, - 'dirs-changed': {}} - for x in ['xml', 'json']: - data['commits'][x] = rev.render_commit(x) - data['dirs-changed'][x] = rev.render_dirs_changed(x) - - log.msg("COMMIT: r%d in %d paths (%d clients)" % (rev.rev, - len(rev.dirs_changed), - self.cc())) - for k in self.clients.keys(): - for c in self.clients[k]: - if c.interested_in(rev.repos): - c.write_data(data[k]) + def notifyAll(self, commit): + data = commit.render_commit() + + log.msg("COMMIT: %s (%d clients)" % (commit.render_log(), self.cc())) + for client in self.clients: + if client.interested_in(commit): + client.write_data(data) def render_PUT(self, request): request.setHeader('content-type', 'text/plain') @@ -260,17 +216,20 @@ class SvnPubSub(resource.Resource): input = request.content.read() #import pdb;pdb.set_trace() #print "input: %s" % (input) - r = json.loads(input) - rev = Revision(r) - self.notifyAll(rev) + try: + c = json.loads(input) + commit = Commit(c) + except ValueError as e: + request.setResponseCode(400) + log.msg("COMMIT: failed due to: %s" % str(e)) + return str(e) + self.notifyAll(commit) return "Ok" def svnpubsub_server(): - root = static.File("/dev/null") + root = resource.Resource() s = SvnPubSub() - root.putChild("dirs-changed", s) root.putChild("commits", s) - root.putChild("commit", s) return server.Site(root) if __name__ == "__main__": Copied: subversion/trunk/tools/server-side/svnpubsub/svnwcsub.conf.example (from r1428993, subversion/trunk/tools/server-side/svnpubsub/example.conf) URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/svnwcsub.conf.example?p2=subversion/trunk/tools/server-side/svnpubsub/svnwcsub.conf.example&p1=subversion/trunk/tools/server-side/svnpubsub/example.conf&r1=1428993&r2=1432778&rev=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/example.conf (original) +++ subversion/trunk/tools/server-side/svnpubsub/svnwcsub.conf.example Mon Jan 14 01:16:57 2013 @@ -1,8 +1,6 @@ -### turn this into an example - [DEFAULT] svnbin: /usr/local/bin/svn -streams: http://svn.example.org:2069/commits/xml +streams: http://svn.example.org:2069/commits/svn hook: /usr/bin/true ## The values below are used by ConfigParser's interpolation syntax. @@ -14,5 +12,5 @@ HOME: /home/svn LANG: en_US.UTF-8 [track] -/usr/local/foo/prod: %(SOME_REPOS)/foo/production -/usr/local/foo/dev: %(SOME_REPOS)/foo/trunk +/usr/local/foo/prod: %(SOME_REPOS)s/foo/production +/usr/local/foo/dev: %(SOME_REPOS)s/foo/trunk Modified: subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py Mon Jan 14 01:16:57 2013 @@ -30,19 +30,42 @@ # See svnwcsub.conf for more information on its contents. # +# TODO: +# - bulk update at startup time to avoid backlog warnings +# - fold BDEC into Daemon +# - fold WorkingCopy._get_match() into __init__ +# - remove wc_ready(). assume all WorkingCopy instances are usable. +# place the instances into .watch at creation. the .update_applies() +# just returns if the wc is disabled (eg. could not find wc dir) +# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER +# (a base path exclusion list should work for the ASF) +# - add support for SIGHUP to reread the config and reinitialize working copies +# - joes will write documentation for svnpubsub as these items become fulfilled +# - make LOGLEVEL configurable + import errno import subprocess import threading import sys import os import re -import ConfigParser +import posixpath +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser import time import logging.handlers -import Queue +try: + import Queue +except ImportError: + import queue as Queue import optparse import functools -import urlparse +try: + import urlparse +except ImportError: + import urllib.parse as urlparse import daemonize import svnpubsub.client @@ -151,15 +174,10 @@ class BigDoEverythingClasss(object): self.env = config.get_env() self.tracking = config.get_track() self.hook = config.get_value('hook') + self.streams = config.get_value('streams').split() self.worker = BackgroundWorker(self.svnbin, self.env, self.hook) self.watch = [ ] - self.hostports = [ ] - ### switch from URLs in the config to just host:port pairs - for url in config.get_value('streams').split(): - parsed = urlparse.urlparse(url.strip()) - self.hostports.append((parsed.hostname, parsed.port)) - def start(self): for path, url in self.tracking.items(): # working copies auto-register with the BDEC when they are ready. @@ -175,15 +193,15 @@ class BigDoEverythingClasss(object): def _normalize_path(self, path): if path[0] != '/': return "/" + path - return os.path.abspath(path) + return posixpath.abspath(path) - def commit(self, host, port, rev): - logging.info("COMMIT r%d (%d paths) from %s:%d" - % (rev.rev, len(rev.dirs_changed), host, port)) + def commit(self, url, commit): + logging.info("COMMIT r%d (%d paths) from %s" + % (commit.id, len(commit.changed), url)) - paths = map(self._normalize_path, rev.dirs_changed) + paths = map(self._normalize_path, commit.changed) if len(paths): - pre = os.path.commonprefix(paths) + pre = posixpath.commonprefix(paths) if pre == "/websites/": # special case for svnmucc "dynamic content" buildbot commits # just take the first production path to avoid updating all cms working copies @@ -194,8 +212,8 @@ class BigDoEverythingClasss(object): break #print "Common Prefix: %s" % (pre) - wcs = [wc for wc in self.watch if wc.update_applies(rev.uuid, pre)] - logging.info("Updating %d WC for r%d" % (len(wcs), rev.rev)) + wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)] + logging.info("Updating %d WC for r%d" % (len(wcs), commit.id)) for wc in wcs: self.worker.add_work(OP_UPDATE, wc) @@ -376,18 +394,18 @@ class Daemon(daemonize.Daemon): # Start the BDEC (on the main thread), then start the client self.bdec.start() - mc = svnpubsub.client.MultiClient(self.bdec.hostports, + mc = svnpubsub.client.MultiClient(self.bdec.streams, self.bdec.commit, self._event) mc.run_forever() - def _event(self, host, port, event_name): + def _event(self, url, event_name, event_arg): if event_name == 'error': - logging.exception('from %s:%s', host, port) + logging.exception('from %s', url) elif event_name == 'ping': - logging.debug('ping from %s:%s', host, port) + logging.debug('ping from %s', url) else: - logging.info('"%s" from %s:%s', event_name, host, port) + logging.info('"%s" from %s', event_name, url) def prepare_logging(logfile): Modified: subversion/trunk/tools/server-side/svnpubsub/testserver.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/testserver.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/testserver.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/testserver.py Mon Jan 14 01:16:57 2013 @@ -30,7 +30,7 @@ import BaseHTTPServer PORT = 2069 -TEST_BODY = 'johndoe2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)Frob the ganoozle with the snookishone/path/some/other/directory/' +TEST_BODY = '{"svnpubsub": {"version": 1}}\0{"commit": {"type": "svn", "format": 1, "repository": "12345678-1234-1234-1234-123456789012", "id": "1234", "committer": "johndoe", "date": "2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)", "log": "Frob the ganoozle with the snookish", "changed": {"one/path/alpha": {"flags": "U "}, "some/other/directory/": {"flags": "_U "}}}}\0' SEND_KEEPALIVE = True Modified: subversion/trunk/tools/server-side/svnpubsub/watcher.py URL: http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/watcher.py?rev=1432778&r1=1432777&r2=1432778&view=diff ============================================================================== --- subversion/trunk/tools/server-side/svnpubsub/watcher.py (original) +++ subversion/trunk/tools/server-side/svnpubsub/watcher.py Mon Jan 14 01:16:57 2013 @@ -19,39 +19,37 @@ # # Watch for events from SvnPubSub and print them to stdout # -# ### usage... # import sys -import urlparse import pprint +try: + import urlparse +except ImportError: + import urllib.parse as urlparse import svnpubsub.client -import svnwcsub ### for ReloadableConfig -def _commit(host, port, rev): - print 'COMMIT: from %s:%s' % (host, port) - pprint.pprint(vars(rev), indent=2) +def _commit(url, commit): + print('COMMIT: from %s' % url) + pprint.pprint(vars(commit), indent=2) -def _event(host, port, event_name): - print 'EVENT: from %s:%s "%s"' % (host, port, event_name) +def _event(url, event_name, event_arg): + if event_arg: + print('EVENT: from %s "%s" "%s"' % (url, event_name, event_arg)) + else: + print('EVENT: from %s "%s"' % (url, event_name)) -def main(config_file): - config = svnwcsub.ReloadableConfig(config_file) - hostports = [ ] - for url in config.get_value('streams').split(): - parsed = urlparse.urlparse(url) - hostports.append((parsed.hostname, parsed.port)) - - mc = svnpubsub.client.MultiClient(hostports, _commit, _event) +def main(urls): + mc = svnpubsub.client.MultiClient(urls, _commit, _event) mc.run_forever() if __name__ == "__main__": - if len(sys.argv) != 2: - print "invalid args, read source code" + if len(sys.argv) < 2: + print("usage: watcher.py URL [URL...]") sys.exit(0) - main(sys.argv[1]) + main(sys.argv[1:])