#!/usr/bin/env python # git-cl -- a git-command for integrating reviews on Rietveld # Copyright (C) 2008 Evan Martin # Copyright (C) 2010 Andy Smith import logging import optparse import os import re import shutil import subprocess import sys import tempfile try: import cStringIO as StringIO except ImportError: import StringIO NAMESPACE = 'bzr' def die(message, *args): logging.error(message, *args) sys.exit(1) def run_command(cmd, error_ok=False, error_message=None, exit_code=False, redirect_stdout=True, return_proc=False, stdout=None, stdin=None): # Useful for debugging: logging.info(' '.join(cmd)) if redirect_stdout and stdout is None: stdout = subprocess.PIPE proc = subprocess.Popen(cmd, stdout=stdout, stdin=stdin) if return_proc: return proc if stdout == subprocess.PIPE: output = proc.communicate()[0] else: output = '' proc.wait() if exit_code: return proc.returncode if not error_ok and proc.returncode != 0: die('Command "%s" failed.\n' % (' '.join(cmd)) + (error_message or output)) return output def git(args, **kwargs): cmd = ['git'] + args return run_command(cmd, **kwargs) def bzr(args, **kwargs): cmd = ['bzr'] + args return run_command(cmd, **kwargs) def short_branch_name(branch): """Convert a name like 'refs/heads/foo' to just 'foo'.""" return branch.replace('refs/heads/', '') def bzr_ref_name(branch): return '%s/%s' % (NAMESPACE, branch) def normalize_upstream(path): """Fix paths if necessary. Upstream will either be a launchpad url, a bzr url, or a path. """ if not path: return path if ':' not in path: return os.path.abspath(path) return path def branch_name(bzr_ref): return bzr_ref[len(NAMESPACE) + 1:] def set_cfg(key, value): git(['config', '%s.%s' % (NAMESPACE, key), value]) def get_cfg(key, **kwargs): kwargs.setdefault('error_ok', True) return git(['config', '%s.%s' % (NAMESPACE, key)], **kwargs).strip() def clear_cfg(key): git(['config', '--unset', '%s.%s' % (NAMESPACE, key)]) def branch_exists(branch): branches = git(['branch', '-a']) branches = branches.split('\n') matcher = re.compile(r'\s%s$' % branch) for x in branches: if matcher.search(x): return True return False def bzr_tags(): tags = bzr(['tags']) tags = tags.split('\n') if tags: return [x.split()[0] for x in tags if x.strip()] return tags def strip_bzr_tags(cl, branch): curdir = os.getcwd() os.chdir(cl.bzr_dir(branch)) tags = bzr_tags() for x in tags: bzr(['tag', '--delete', x]) os.chdir(curdir) class LoggingPipe(object): def __init__(self, pipe, name=''): self.pipe = pipe self.name = name self._capture = False self._io = None if logging.getLogger().getEffectiveLevel() == logging.DEBUG: self._capture = True self._io = tempfile.TemporaryFile() def fileno(self): if self._capture: return self._io.fileno() return self.pipe.fileno() def close(self, *args, **kw): if self._capture: logging.debug('captured output (%s):', self.name) self._io.seek(0) rv = self._io.read() self._io.close(*args, **kw) logging.debug(rv) self.pipe.write(rv) self.pipe.close(*args, **kw) class Changelist(object): def __init__(self, branchref=None): self._branchref = branchref if self._branchref: self._branch = short_branch_name(self._branchref) else: self._branch = None self._git_dir = None self._root_dir = None self._bzr_dir = None self._map_dir = None def branch(self): """Returns the short branch name, e.g. 'master'.""" if not self._branch: self._branchref = git(['symbolic-ref', 'HEAD']).strip() self._branch = short_branch_name(self._branchref) return self._branch def branch_ref(self): """Returns the full branch name, e.g. 'refs/heads/master'.""" self.branch() # Poke the lazy loader. return self._branchref def git_dir(self, path=None): if not self._git_dir: self._git_dir = os.path.abspath(git(['rev-parse', '--git-dir']).strip()) if path: return os.path.join(self._git_dir, path) return self._git_dir def root_dir(self, path=None): if not self._root_dir: self._root_dir = os.path.dirname(self.git_dir()) if path: return os.path.join(self._root_dir, path) return self._root_dir def bzr_dir(self, path=None): if not self._bzr_dir: self._bzr_dir = os.path.join(self.git_dir(), NAMESPACE, 'repo') if path: return os.path.join(self._bzr_dir, path) return self._bzr_dir def map_dir(self, path=None): if not self._map_dir: self._map_dir = os.path.join(self.git_dir(), NAMESPACE, 'map') if path: return os.path.join(self._map_dir, path) return self._map_dir def rewrite_bzr_marks_file(filename): logging.debug('rewrite bzr marks') f = open(filename + '-tmp', 'w') for line in open(filename): logging.debug('- %s', line.strip()) try: int(line.split(' ')[0]) line = ':' + line except: pass logging.debug('+ %s', line.strip()) f.write(line) f.close() shutil.move(filename + '-tmp', filename) def unrewrite_bzr_marks_file(filename): logging.debug('unrewrite bzr marks') f = open(filename + '-tmp', 'w') for line in open(filename): logging.debug('- %s', line.strip()) line = line.lstrip(':') logging.debug('+ %s', line.strip()) f.write(line) f.close() shutil.move(filename + '-tmp', filename) def export_bzr(bzr_ref, cl=None, overwrite=False): if cl is None: cl = Changelist() branch = branch_name(bzr_ref) bzr_marks = cl.map_dir('%s-bzr' % branch) git_marks = cl.map_dir('%s-git' % branch) # TODO(termie): sanity checks if not overwrite and os.path.exists(bzr_marks): # HACK: bzr fast-export seems to like to write out revno without the ':' # that git uses # this may have to be removed if bzr fast-export changes its format #unrewrite_bzr_marks_file(bzr_marks) bzr_import_arg = ['--import-marks=%s' % bzr_marks] git_import_arg = ['--import-marks=%s' % git_marks] else: bzr_import_arg = git_import_arg = [] if overwrite: git(['branch', '-D', bzr_ref], error_ok=True) # NOTE(termie): this happens in reverse because we're piping git_proc = git(['fast-import'] + git_import_arg + [ '--quiet', '--export-marks=%s' % git_marks], stdin=subprocess.PIPE, return_proc=True) git_proc_in = LoggingPipe(git_proc.stdin, 'bzr fast-export') bzr_proc = bzr(['fast-export'] + bzr_import_arg + [ '--plain', '--export-marks=%s' % bzr_marks, '--git-branch=%s' % bzr_ref, cl.bzr_dir(branch)], stdout=git_proc_in, return_proc=True) bzr_proc.wait() if bzr_proc.returncode != 0: die('bzr export failed') git_proc_in.close() git_proc.wait() if bzr_proc.returncode != 0: die('git import failed') return bzr_ref def export_git(branch, cl=None, parent_branch=None): if cl is None: cl = Changelist() git_marks = cl.map_dir('%s-git' % branch) bzr_marks = cl.map_dir('%s-bzr' % branch) # TODO(termie): sanity checks if os.path.exists(bzr_marks): # HACK: bzr fast-export seems to like to write out revno without the ':' # that git uses # this may have to be removed if bzr fast-export changes its format rewrite_bzr_marks_file(bzr_marks) git_import_arg = ['--import-marks=%s' % git_marks] bzr_import_arg = ['--import-marks=%s' % bzr_marks] elif parent_branch: git_parent_marks = cl.map_dir('%s-git' % parent_branch) bzr_parent_marks = cl.map_dir('%s-bzr' % parent_branch) bzr(['branch', cl.bzr_dir(parent_branch), cl.bzr_dir(branch)]) rewrite_bzr_marks_file(bzr_parent_marks) bzr_import_arg = ['--import-marks=%s' % bzr_parent_marks] git_import_arg = ['--import-marks=%s' % git_parent_marks] else: git_import_arg = [] bzr_import_arg = [] # NOTE(termie): this happens in reverse because we're piping bzr_proc = bzr(['fast-import'] + bzr_import_arg + [ '--export-marks=%s' % bzr_marks, '-', cl.bzr_dir(branch)], stdin=subprocess.PIPE, return_proc=True) bzr_proc_in = LoggingPipe(bzr_proc.stdin, 'git fast-export') git_proc = git(['fast-export', '-M'] + git_import_arg + [ '--export-marks=%s' % git_marks, branch], stdout=bzr_proc_in, return_proc=True) git_proc.wait() if git_proc.returncode != 0: die('git export failed') bzr_proc_in.close() bzr_proc.wait() if bzr_proc.returncode != 0: die('bzr import failed') def init_repo(cl=None): if cl is None: cl = Changelist() if not os.path.exists(cl.bzr_dir()): os.makedirs(cl.bzr_dir()) # Initialize a bzr repo bzr(['init-repo', '--no-trees', cl.bzr_dir()]) if not os.path.exists(cl.map_dir()): os.makedirs(cl.map_dir()) def cmd_init(args): parser = optparse.OptionParser(usage='git bzr init') parser.description = ('Init a new bzr tracking branch based on master') (options, args) = parser.parse_args(args) cl = Changelist() # Ensure our directories exist init_repo(cl) branch = 'master' bzr_ref = bzr_ref_name(branch) if branch_exists(bzr_ref): die('Branch already exists: %s', bzr_ref) bzr(['init', cl.bzr_dir(branch)]) export_git(branch, cl=cl) # TODO(termie): does it make sense for this to be a --track? git(['branch', bzr_ref, branch]) set_cfg('%s.bzr' % branch, bzr_ref) def cmd_clone(args): parser = optparse.OptionParser(usage='git bzr clone []') parser.description = ('Effectively a bzr branch ') parser.add_option('--strip_tags', action='store_true', dest='strip_tags', help='strip tags from bzr when importing') (options, args) = parser.parse_args(args) # TODO(termie): command-line validation url = args[0] if len(args) == 1: target = url.rpartition(':')[2].rstrip('/').rpartition('/')[2] else: target = args[1] # TODO(termie): sanity checking url = normalize_upstream(url) target = os.path.abspath(target) branch = 'master' bzr_ref = bzr_ref_name(branch) if os.path.exists(target): die('Directory already exists: %s', target) git(['init', target]) os.chdir(target) cl = Changelist() # Ensure our directories exist init_repo(cl) args = [url, branch] if options.strip_tags: args = ['--strip_tags'] + args cmd_import(args) git(['checkout', branch]) def cmd_push(args): parser = optparse.OptionParser(usage='git bzr push ') parser.description = ('Effectively a bzr push ') parser.add_option('--parent_branch', action='store', dest='parent_branch', default='master', help='use this branch as the parent branch') parser.add_option('--overwrite', action='store_true', dest='overwrite', default=False, help='push with bzr overwrite') (options, args) = parser.parse_args(args) cl = Changelist() # Ensure our directories exist init_repo(cl) upstream = None if len(args): upstream = args[0] # Attempt to normalize the upstream upstream = normalize_upstream(upstream) # Find the upstream from the tracking branch bzr_ref = get_cfg('%s.bzr' % cl.branch()) if not upstream and not bzr_ref: die('This branch has no associated bzr tracking branch') if bzr_ref: known_upstream = get_cfg('%s.upstream' % bzr_ref) if not upstream and not known_upstream: die('No upstream found for this bzr tracking branch (%s) ... uh oh.' % bzr_ref) else: known_upstream = None if not options.overwrite and known_upstream and (not upstream or upstream == known_upstream): upstream = known_upstream # Sync first cmd_sync([bzr_ref]) # Check that we are a fast-forward # Using the logic from http://github.com/kfish/git-bzr's implementation revs = git(['rev-list', '--left-right', 'HEAD...%s' % bzr_ref]) updates_to_push = False for line in revs: if line.startswith('>'): die('HEAD is not a fast-forward of %s. Merge before pushing.', bzr_ref) if not updates_to_push and line.startswith('<'): updates_to_push = True if not updates_to_push: die('Nothing to push. Commit something first') branch = cl.branch() bzr_ref = bzr_ref_name(branch) export_git(branch, cl=cl, parent_branch=options.parent_branch) root = cl.root_dir() os.chdir(cl.bzr_dir(branch)) params = [] if options.overwrite: params.append('--overwrite') bzr(['push'] + params + [upstream]) os.chdir(root) if not branch_exists(bzr_ref): git(['branch', bzr_ref, branch]) # If this is out first time, make the tracking branch set_cfg('%s.bzr' % branch, bzr_ref) set_cfg('%s.upstream' % bzr_ref, upstream) def cmd_sync(args): parser = optparse.OptionParser(usage='git bzr sync ') parser.description = ('Sync a bzr tracking branch (bzr/*) with remote') parser.add_option('--overwrite', action='store_true', dest='overwrite', help='overwrite tracking branch with remote copy') (options, args) = parser.parse_args(args) cl = Changelist() # Ensure our directories exist init_repo(cl) bzr_ref = None if len(args): bzr_ref = args[0] if not bzr_ref: bzr_ref = get_cfg('%s.bzr' % cl.branch()) if not bzr_ref: die('Which bzr tracking branch do you want to sync?') upstream = get_cfg('%s.upstream' % bzr_ref) if not upstream: die('No upstream found for bzr tracking branch: %s', bzr_ref) branch = branch_name(bzr_ref) os.chdir(cl.bzr_dir(branch)) if options.overwrite: params = ['pull', '--overwrite', upstream] else: params = ['pull', upstream] bzr(params) os.chdir(cl.root_dir()) export_bzr(bzr_ref, cl=cl, overwrite=options.overwrite) def cmd_pull(args): parser = optparse.OptionParser(usage='git bzr pull ') parser.description = ('Sync a bzr tracking branch (bzr/*) with remote') parser.add_option('--overwrite', action='store_true', dest='overwrite', help='overwrite tracking branch with remote copy') (options, args) = parser.parse_args(args) cmd_sync(args) cl = Changelist() bzr_ref = get_cfg('%s.bzr' % cl.branch()) #git(['pull', '--rebase', '.', '--', bzr_ref]) git(['pull', '.', '--', bzr_ref]) def cmd_import(args): parser = optparse.OptionParser(usage='git bzr import ') parser.description = ('Effectively a bzr branch , but git-style') parser.add_option('--strip_tags', action='store_true', dest='strip_tags', help='strip tags from bzr when importing') (options, args) = parser.parse_args(args) # TODO(termie): command-line validation if not len(args): die('You must provide an URL.') url = args[0] if len(args) == 1: target = url.rpartition(':')[2].rpartition('/')[2] else: target = args[1] url = normalize_upstream(url) branch = target bzr_ref = bzr_ref_name(branch) cl = Changelist() # Ensure our directories exist init_repo(cl) if branch_exists(branch): die('Branch already exists: %s', branch) if branch_exists(bzr_ref): die('Branch already exists: %s', bzr_ref) # Do the actual bzr fetch bzr(['branch', url, cl.bzr_dir(branch)]) if options.strip_tags: strip_bzr_tags(cl, branch) # Fast export from export_bzr(bzr_ref, cl=cl) # Create a new local branch # TODO(termie): does it make sense for this to be a --track? git(['branch', branch, bzr_ref]) set_cfg('%s.bzr' % branch, bzr_ref) set_cfg('%s.upstream' % bzr_ref, url) def cmd_clear(args): parser = optparse.OptionParser(usage='git bzr clear []') parser.description = ('Clear all information for a given bzr branch') (options, args) = parser.parse_args(args) cl = Changelist() # Ensure our directories exist init_repo(cl) bzr_ref = None if len(args): bzr_ref = args[0] if not bzr_ref: bzr_ref = get_cfg('%s.bzr' % cl.branch()) if bzr_ref: clear_cfg('%s.bzr' % cl.branch()) if not bzr_ref: die('No bzr tracking information associated with this branch,' 'which bzr tracking branch do you want to clear?') upstream = get_cfg('%s.upstream' % bzr_ref) if upstream: clear_cfg('%s.upstream' % bzr_ref) branch = branch_name(bzr_ref) branch_dir = cl.bzr_dir(branch) git_maps = cl.map_dir('%s-git' % branch) bzr_maps = cl.map_dir('%s-bzr' % branch) if os.path.exists(branch_dir): shutil.rmtree(branch_dir) for map_file in (git_maps, bzr_maps): if os.path.exists(map_file): os.unlink(map_file) if branch_exists(bzr_ref): git(['branch', '-D', bzr_ref]) def cmd_marks(args): parser = optparse.OptionParser(usage='git bzr push ') parser.description = ('Effectively a bzr push ') parser.add_option('--git', action='store_true', dest='git', default=True, help='show the git marks file') parser.add_option('--bzr', action='store_false', dest='git', default=True, help='show the bzr marks file') parser.add_option('--file', action='store_true', dest='fileonly', default=False, help='show the marks filename only') (options, args) = parser.parse_args(args) cl = Changelist() # Ensure our directories exist init_repo(cl) branch = cl.branch() if options.git: marks = cl.map_dir('%s-git' % branch) else: marks = cl.map_dir('%s-bzr' % branch) if options.fileonly: print marks else: print 'Marks file: %s' % marks print open(marks).read() COMMANDS = [ ('init', 'init a new bzr branch', cmd_init), ('clone', 'clone a bzr repo', cmd_clone), ('clear', 'clear bzr data for a branch', cmd_clear), ('push', 'push to a bzr repo', cmd_push), ('sync', 'sync bzr tracking branch to remote', cmd_sync), ('pull', 'sync bzr tracking branch to remote and pull from it', cmd_pull), ('import', 'import a bzr branch as a new git branch', cmd_import), ('marks', 'show the marks files for a branch', cmd_marks), ] def usage(name): print 'usage: %s ' % name print 'commands are:' for name, desc, _ in COMMANDS: print ' %-10s %s' % (name, desc) sys.exit(1) def main(argv): if len(argv) < 2: usage(argv[0]) #logging.getLogger().setLevel(logging.DEBUG) command = argv[1] for name, _, func in COMMANDS: if name == command: return func(argv[2:]) print 'unknown command: %s' % command usage(argv[0]) if __name__ == '__main__': sys.exit(main(sys.argv))