From 071ee9aec6b2dbb90f65f8caf188b7a54c189bbe Mon Sep 17 00:00:00 2001 From: Chris Lamb Date: Thu, 4 Aug 2016 17:49:11 +0000 Subject: lts-missing-uplodas: Rewrite as a class. git-svn-id: svn+ssh://svn.debian.org/svn/secure-testing@43769 e39458fd-73e7-0310-bf30-c45bca0a0e42 --- bin/lts-missing-uploads.py | 138 ++++++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 65 deletions(-) (limited to 'bin/lts-missing-uploads.py') diff --git a/bin/lts-missing-uploads.py b/bin/lts-missing-uploads.py index 8c576e03ba..fadd92a4c6 100755 --- a/bin/lts-missing-uploads.py +++ b/bin/lts-missing-uploads.py @@ -25,101 +25,109 @@ import dateutil.relativedelta from debian.deb822 import Sources from debian.debian_support import Version -SOURCES = 'http://security.debian.org/dists/wheezy/updates/main/source/Sources.gz' +class LTSMissingUploads(object): + SOURCES = 'http://security.debian.org/dists/wheezy/updates/main/source/Sources.gz' -re_line = re.compile( - r'(?Pmsg\d+.html).*\[DLA (?P[\d-]+)\] (?P[^\s]+) security update.*' -) -re_version = re.compile(r'^Version.*: (?P.*)') - -session = requests.Session() - -def get_dlas(year, month): - url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format( - year, - month, + re_line = re.compile( + r'(?Pmsg\d+.html).*\[DLA (?P[\d-]+)\] (?P[^\s]+) security update.*' ) + re_version = re.compile(r'^Version.*: (?P.*)') - result = parse(session.get(url).content, re_line) + def __init__(self): + self.session = requests.session() - # Prepend URL as the indices have relative URIs - for x in result: - x['url'] = '{}{}'.format(url, x['suffix']) + def main(self, *args): + dlas = {} - return result + for idx in range(3): + dt = datetime.datetime.utcnow().replace(day=1) - \ + dateutil.relativedelta.relativedelta(months=idx) -def get_dla(url): - return parse(session.get(url).content, re_version) + self.info( + "Getting announcements for {}/{:02} ...", + dt.year, + dt.month, + ) -def main(*args): - dlas = {} + # Prefer later DLAs with reversed(..) + for x in reversed(self.get_dlas(dt.year, dt.month)): + # Only comment on the latest upload + if x['source'] in dlas: + continue - for idx in range(3): - dt = datetime.datetime.utcnow().replace(day=1) - \ - dateutil.relativedelta.relativedelta(months=idx) + self.info("{source}: parsing announcement from {url} ...", **x) + x.update(self.get_dla(x['url'])[0]) + dlas[x['source']] = x - info("Getting announcements for {}/{:02} ...", dt.year, dt.month) + if not dlas: + return 0 - # Prefer later DLAs with reversed(..) - for x in reversed(get_dlas(dt.year, dt.month)): - # Only comment on the latest upload - if x['source'] in dlas: - continue + sources = self.get_sources() + + for source, dla in sorted(dlas.items()): + version = sources[source] - info("{source}: parsing announcement from {url} ...", **x) - x.update(get_dla(x['url'])[0]) - dlas[x['source']] = x + if Version(dla['version']) > Version(version): + self.warn("{}: DLA-{} announced version {} but LTS has {} <{}>".format( + source, + dla['dla'], + dla['version'], + version, + dla['url'], + )) - if not dlas: return 0 - sources = get_sources() + def get_dlas(self, year, month): + url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format( + year, + month, + ) - for source, dla in sorted(dlas.items()): - version = sources[source] + result = self.parse(self.session.get(url).content, self.re_line) - if Version(dla['version']) > Version(version): - warn("{}: DLA-{} announced version {} but LTS has {} <{}>".format( - source, - dla['dla'], - dla['version'], - version, - dla['url'], - )) + # Prepend URL as the indices have relative URIs + for x in result: + x['url'] = '{}{}'.format(url, x['suffix']) - return 0 + return result -def get_sources(): - info("Downloading Sources from {} ...", SOURCES) + def get_dla(self, url): + return self.parse(self.session.get(url).content, self.re_version) - response = requests.get(SOURCES) - response.raise_for_status() + def get_sources(self): + self.info("Downloading Sources from {} ...", self.SOURCES) - val = gzip.decompress(response.content).decode('utf-8') + response = self.session.get(self.SOURCES) + response.raise_for_status() - return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)} + val = gzip.decompress(response.content).decode('utf-8') -def warn(msg, *args, **kwargs): - print("W: " + msg.format(*args, **kwargs), file=sys.stderr) + return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)} -def info(msg, *args, **kwargs): - print("I: " + msg.format(*args, **kwargs), file=sys.stderr) + def parse(self, content, pattern): + result = [] + + for x in content.splitlines(): + m = pattern.search(x.decode('utf8')) + + if m is None: + continue -def parse(content, pattern): - result = [] + result.append(m.groupdict()) - for x in content.splitlines(): - m = pattern.search(x.decode('utf8')) + return result - if m is None: - continue + ## - result.append(m.groupdict()) + def warn(self, msg, *args, **kwargs): + print("W: " + msg.format(*args, **kwargs), file=sys.stderr) - return result + def info(self, msg, *args, **kwargs): + print("I: " + msg.format(*args, **kwargs), file=sys.stderr) if __name__ == '__main__': try: - sys.exit(main(*sys.argv[1:])) + sys.exit(LTSMissingUploads().main(*sys.argv[1:])) except KeyboardInterrupt: sys.exit(1) -- cgit v1.2.3