#!/usr/bin/env python3
#
# apt-metalink - Download deb packages from multiple servers concurrently
# Copyright (C) 2010 Tatsuhiro Tsujikawa
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os.path
import subprocess
import textwrap
import sys
import optparse
import errno
import hashlib
import apt
import apt_pkg
class AptMetalink:
def __init__(self, opts):
self.cache = apt.Cache(apt.progress.text.OpProgress())
self.opts = opts
self.archive_dir = apt_pkg.config.find_dir('Dir::Cache::Archives')
if not self.archive_dir:
raise Exception(('No archive dir is set.'
' Usually it is /var/cache/apt/archives/'))
def upgrade(self, dist_upgrade=False):
self.cache.upgrade(dist_upgrade=dist_upgrade)
self._get_changes()
def install(self, pkg_names):
for pkg_name in pkg_names:
if pkg_name in self.cache:
pkg = self.cache[pkg_name]
if not pkg.installed:
pkg.mark_install()
elif pkg.is_upgradable:
pkg.mark_upgrade()
else:
raise Exception('{0} is not found'.format(pkg_name))
self._get_changes()
def _get_changes(self):
pkgs = sorted(self.cache.get_changes(), key=lambda p:p.name)
if pkgs:
_print_update_summary(self.cache, pkgs)
sys.stdout.write("Do you want to continue [Y/n]?")
sys.stdout.flush()
ans = sys.stdin.readline().strip()
if ans and ans.lower() != 'y':
print("Abort.")
return
pkgs = [pkg for pkg in pkgs if not pkg.marked_delete and \
not self._file_downloaded(pkg, hash_check = \
self.opts.hash_check)]
if self.opts.metalink_out:
with open(self.opts.metalink_out, 'w', encoding='utf-8') as f:
make_metalink(f, pkgs)
return
if not self._download(pkgs, num_concurrent=guess_concurrent(pkgs)):
print("Some download fails. apt_pkg will take care of them.")
if self.opts.download_only:
print("Download complete and in download only mode")
else:
self.cache.commit(apt.progress.text.AcquireProgress())
def _download(self, pkgs, num_concurrent=1):
if not pkgs:
return True
partial_dir = os.path.join(self.archive_dir, 'partial')
cmdline = [self.opts.aria2c,
'--metalink-file=-',
'--file-allocation=none',
'--auto-file-renaming=false',
'--dir={0}'.format(partial_dir),
'--max-concurrent-downloads={0}'.format(num_concurrent),
'--no-conf',
'--remote-time=true',
'--auto-save-interval=0',
'--continue',
'--split=1'
]
if self.opts.hash_check:
cmdline.append('--check-integrity=true')
http_proxy = apt_pkg.config.find('Acquire::http::Proxy')
https_proxy = apt_pkg.config.find('Acquire::https::Proxy', http_proxy)
ftp_proxy = apt_pkg.config.find('Acquire::ftp::Proxy')
if http_proxy:
cmdline.append('='.join(['--http-proxy', http_proxy]))
if https_proxy:
cmdline.append('='.join(['--https-proxy', https_proxy]))
if ftp_proxy:
cmdline.append('='.join(['--ftp-proxy', ftp_proxy]))
proc = subprocess.Popen(cmdline,
stdin=subprocess.PIPE,
stdout=1,
stderr=2,
encoding='utf-8')
make_metalink(proc.stdin, pkgs)
proc.stdin.close()
proc.wait()
link_success = True
# Link archives/partial/*.deb to archives/
for pkg in pkgs:
filename = get_filename(pkg.candidate)
dst = os.path.join(self.archive_dir, filename)
src = os.path.join(partial_dir, filename)
ctrl_file = ''.join([src, '.aria2'])
# If control file exists, we assume download is not
# complete.
if os.path.exists(ctrl_file):
continue
try:
# Making hard link because aria2c needs file in
# partial directory to know download is complete
# in the next invocation.
os.rename(src, dst)
except OSError as e:
if e.errno != errno.ENOENT:
print("Failed to move archive file", e)
link_success = False
return proc.returncode == 0 and link_success
def _file_downloaded(self, pkg, hash_check=False):
candidate = pkg.candidate
path = os.path.join(self.archive_dir, get_filename(candidate))
if not os.path.exists(path) or os.stat(path).st_size != candidate.size:
return False
if hash_check:
hash_type, hash_value = get_hash(pkg.candidate)
try:
return check_hash(path, hash_type, hash_value)
except IOError as e:
if e.errno != errno.ENOENT:
print("Failed to check hash", e)
return False
else:
return True
def check_hash(path, hash_type, hash_value):
with open(path, 'rb') as f:
digest = hashlib.file_digest(f, hash_type)
return digest.hexdigest() == hash_value
def get_hash(version):
if version.sha256:
return ("sha256", version.sha256)
elif version.sha1:
return ("sha1", version.sha1)
elif version.md5:
return ("md5", version.md5)
else:
return (None, None)
def get_filename(version):
# TODO apt-get man page said filename and basename in URI
# could be different.
quoted_version = version.version.replace(':', '%3a')
return f'{version.package.shortname}_{quoted_version}_{version.architecture}.deb'
def make_metalink(out, pkgs):
out.write('')
out.write('')
for pkg in pkgs:
version = pkg.candidate
hashtype, hashvalue = get_hash(version)
out.write(''.format(get_filename(version)))
out.write('{0}'.format(version.size))
if hashtype:
out.write('{1}'.format(hashtype, hashvalue))
for uri in version.uris:
out.write('{0}'.format(uri))
out.write('')
out.write('')
def guess_concurrent(pkgs):
max_uris = 0
for pkg in pkgs:
version = pkg.candidate
max_uris = max(len(version.uris), max_uris)
return max_uris
def pprint_names(msg, names):
if names:
print(msg)
print(textwrap.fill(' '.join(names),
width=78,
initial_indent=' ',
subsequent_indent=' ',
break_long_words=False,
break_on_hyphens=False))
def unit_str(val):
if val > 1000*1000:
return '{0:.1f}MB'.format(val/1000/1000)
elif val > 1000:
return '{0:.0f}kB'.format(round(val/1000))
else:
return '{0:.0f}B'.format(val)
def _print_update_summary(cache, pkgs):
delete_names = []
install_names = []
upgrade_names = []
# TODO marked_downgrade, marked_keep, marked_reinstall
for pkg in pkgs:
if pkg.marked_delete:
delete_names.append(pkg.name)
elif pkg.marked_install:
install_names.append(pkg.name)
elif pkg.marked_upgrade:
upgrade_names.append(pkg.name)
pprint_names('The following packages will be REMOVED:', delete_names)
pprint_names('The following NEW packages will be installed:', install_names)
pprint_names('The following packages will be upgraded:', upgrade_names)
print(('{0} upgraded, {1} newly installed, {2} to remove and'
' {3} not upgraded')\
.format(len(upgrade_names), len(install_names), len(delete_names),
cache.keep_count))
print('Need to get {0} of archives.'\
.format(unit_str(cache.required_download)))
if cache.required_space < 0:
print('After this operation, {0} disk space will be freed.'\
.format(unit_str(-cache.required_space)))
else:
print(('After this operation, {0} of additional disk space will'
' be used.').format(unit_str(cache.required_space)))
def main():
usage = 'Usage: %prog [options] {upgrade | dist-upgrade | install pkg ...}'
parser = optparse.OptionParser(usage=usage)
parser.add_option('-d', '--download-only', action='store_true',
help="Download only. [default: %default]")
parser.add_option('--metalink-out', metavar="FILE",
help=("""\
Instead of fetching the files, Metalink XML document is saved to given
FILE. Metalink XML document contains package's URIs and checksums.
"""))
parser.add_option('--hash-check', action="store_true",
help=("Check hash of already downloaded files."
" If hash check fails, download file again."))
parser.add_option('-x', '--aria2c' ,dest='aria2c',
help="path to aria2c executable [default: %default]")
parser.set_defaults(download_only=False)
parser.set_defaults(hash_check=False)
parser.set_defaults(aria2c='/usr/bin/aria2c')
opts, args = parser.parse_args()
if not args:
print('No command is given.')
parser.print_usage()
exit(1)
command = args[0]
am = AptMetalink(opts)
if command == 'upgrade':
am.upgrade()
elif command == 'dist-upgrade':
am.upgrade(dist_upgrade=True)
elif command == 'install':
am.install(args[1:])
else:
print("Command {0} is not supported.".format(command))
if __name__ == '__main__':
main()