ansible-ops-management/files/usr/libexec/rocky/rss.py
2023-09-24 07:25:50 -07:00

313 lines
11 KiB
Python

#!/usr/bin/env python3
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
# modified version of repo-rss from yum utils
# changelog
# -> 20230912: do not xmlescape entire description variable
import sys
import os
import re
import argparse
import time
import binascii
import base64
# The old yum-utils repo-rss used string manipulation. We're instead going to
# use the XML python library to do the work for us. This is cleaner, imo.
from xml.sax.saxutils import escape as xmlescape
from xml.etree.ElementTree import ElementTree, TreeBuilder, tostring
from xml.dom import minidom
import dnf
import dnf.exceptions
#from dnf.comps import Comps
#import libxml2
def to_unicode(string: str) -> str:
"""
Convert to unicode
"""
if isinstance(string, bytes):
return string.decode('utf8')
if isinstance(string, str):
return string
return str(string)
def to_base64(string: str) -> str:
"""
Converts a string to base64, but we put single quotes around it. This makes
it easier to regex the value.
"""
string_bytes = string.encode('utf-8')
string_conv = base64.b64encode(string_bytes)
base64_str = "'" + string_conv.decode('utf-8') + "'"
return str(base64_str)
def from_base64(string: str) -> str:
"""
Takes a base64 value and returns a string. We also strip off any single
quotes that can happen.
"""
stripped = string.replace("'", "")
conv_bytes = stripped.encode('utf-8')
convd_bytes = base64.b64decode(conv_bytes)
decoded = convd_bytes.decode('utf-8')
return decoded
class DnfQuiet(dnf.Base):
"""
DNF object
"""
def __init__(self):
dnf.Base.__init__(self)
def get_recent(self, days=1):
"""
Return most recent packages from dnf sack
"""
recent = []
now = time.time()
recentlimit = now-(days*86400)
ftimehash = {}
if self.conf.showdupesfromrepos:
available = self.sack.query().available().filter()
else:
available = self.sack.query().available().filter(latest_per_arch=1)
available.run()
for package in available:
ftime = int(package.buildtime)
if ftime > recentlimit:
if ftime not in ftimehash:
ftimehash[ftime] = [package]
else:
ftimehash[ftime].append(package)
for sometime in ftimehash.keys():
for package in ftimehash[sometime]:
recent.append(package)
return recent
class RepoRSS:
def __init__(self, filename='repo-rss.xml'):
self.description = 'Repository RSS'
self.link = 'https://github.com/rpm-software-management/dnf'
self.title = 'Recent Packages'
if filename[0] != '/':
cwd = os.getcwd()
self.filename = os.path.join(cwd, filename)
else:
self.filename = filename
def rsspackage(self, packages):
file = self.filename
rfc822_format = "%a, %d %b %Y %X GMT"
now = time.strftime(rfc822_format, time.gmtime())
etbobj = TreeBuilder()
# start rss
etbobj.start('rss', {'version': '2.0'})
# start channel
etbobj.start('channel', {})
# start title
etbobj.start('title', {})
etbobj.data(self.title)
etbobj.end('title')
# end title
# start link
etbobj.start('link', {})
etbobj.data(self.link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
etbobj.data(self.description)
etbobj.end('description')
# end description
# start pubDate
etbobj.start('pubDate', {})
etbobj.data(now)
etbobj.end('pubDate')
# end pubDate
# start generator
etbobj.start('generator', {})
etbobj.data('DNF')
etbobj.end('generator')
# end generator
changelog_format = "%a, %d %b %Y GMT"
for package in packages:
package_hex = binascii.hexlify(package.chksum[1]).decode()
title = xmlescape(str(package))
date = time.gmtime(float(package.buildtime))
pkg_description = package.description
# package.description is sometimes a NoneType. Don't know why.
if not pkg_description:
pkg_description = ''
link = xmlescape(package.remote_location())
# form description
changelog = ''
count = 0
if package.changelogs is not None:
changelog_list = package.changelogs
else:
changelog_list = []
for meta in changelog_list:
count += 1
if count > 3:
changelog += '...'
break
cl_date = meta['timestamp'].strftime(changelog_format)
author = meta['author']
desc = meta['text']
changelog += f'{cl_date} - {author}\n{desc}\n\n'
description = '<p><strong>{}</strong> - {}{}</p>\n\n'.format(xmlescape(package.name), xmlescape(package.summary), xmlescape("<br />"))
description += '<p>%s</p>\n\n<p><strong>Change Log:</strong></p>\n\n' % xmlescape(to_unicode(pkg_description.replace("\n", "<br />\n")))
description += xmlescape('<pre>{}</pre>'.format(xmlescape(to_unicode(changelog))))
base64_description = to_base64(description)
# start item
etbobj.start('item', {})
# start title
etbobj.start('title', {})
etbobj.data(title)
etbobj.end('title')
# end title
# start pubDate
etbobj.start('pubDate', {})
etbobj.data(time.strftime(rfc822_format, date))
etbobj.end('pubDate')
# end pubDate
# start guid
etbobj.start('guid', {'isPermaLink': 'false'})
etbobj.data(package_hex)
etbobj.end('guid',)
# end guid
# start link
etbobj.start('link', {})
etbobj.data(link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
etbobj.data(base64_description)
etbobj.end('description')
# end description
etbobj.end('item')
# end item
etbobj.end('channel')
# end channel
etbobj.end('rss')
# end rss
rss = etbobj.close()
etree = ElementTree(rss)
some_string = tostring(etree.getroot(), encoding='utf-8')
xmlstr = minidom.parseString(some_string).toprettyxml(indent=" ")
# When writing to the file, we split the string by the newlines. This
# appears as a list. We loop through the list and find <description>',
# the reason is because we did a base64 encoding of the package
# description to keep the etree from encoding the HTML. We decode it
# and then write it back, along with everything else line by line. This
# is very inefficient, but as far as I can tell, there's no way with
# the built in xml library in python to keep it from doing this.
base64_regex = r"'(.*)'"
with open(f'{file}', 'w+', encoding='utf-8') as f:
for line in xmlstr.splitlines():
new_line = line
if "<description>'" in line:
result = re.search(base64_regex, line)
record = from_base64(result.group(0))
new_line = line.replace(result.group(0), record)
f.write(new_line + '\n')
f.close()
def make_rss_feed(filename, title, link, description, recent):
rssobj = RepoRSS(filename)
rssobj.title = title
rssobj.link = link
rssobj.description = description
rssobj.rsspackage(recent)
def main(options):
days = options.days
repoids = options.repoids
dnfobj = DnfQuiet()
if options.config:
dnfobj.conf.read(filename=options.config)
if os.geteuid() != 0 or options.tempcache:
cachedir = dnfobj.conf.cachedir
if cachedir is None:
print('Error: Could not make cachedir')
sys.exit(50)
dnfobj.conf.cachedir = cachedir
try:
dnfobj.read_all_repos()
except:
print('Could not read repos', file=sys.stderr)
sys.exit(1)
if len(repoids) > 0:
for repo in dnfobj.repos:
repoobj = dnfobj.repos[repo]
if repo not in repoids:
repoobj.disable()
else:
repoobj.enable()
if options.module_hotfixes:
try:
repoobj.set_or_append_opt_value('module_hotfixes', '1')
except:
print('Warning: dnf library is too old to support setting values')
repoobj.load_metadata_other = True
print('Getting repo data for requested repos')
try:
dnfobj.fill_sack()
except:
print('repo data failure')
sys.exit(1)
if options.disable_all_modules:
modobj = dnf.module.module_base.ModuleBase(dnfobj)
modobj.disable(['*'])
sack_query = dnfobj.sack.query().available()
#recent = sack_query.filter(latest_per_arch=1)
recent = dnfobj.get_recent(days=days)
#sorted_recents = sorted(set(recent.run()), key=lambda pkg: pkg.buildtime)
sorted_recents = sorted(set(recent), key=lambda pkg: pkg.buildtime)
sorted_recents.reverse()
make_rss_feed(options.filename, options.title, options.link,
options.description, sorted_recents)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--filename', type=str, default='repo-rss.xml',
help='File patch to export to')
parser.add_argument('--link', type=str, default='https://github.com/rpm-software-management/dnf',
help='URL link to repository root')
parser.add_argument('--title', type=str, default='RSS Repository - Recent Packages',
help='Title of the feed')
parser.add_argument('--description', type=str, default='Most recent packages in Repositories',
help='Description of the feed')
parser.add_argument('--days', type=int, default=7, help='Number of days to look back')
parser.add_argument('--tempcache', action='store_true',
help='Temporary cache location (automatically on if not root)')
parser.add_argument('--module-hotfixes', action='store_true',
help='Only use this to catch all module packages')
parser.add_argument('--arches', action='append', default=[],
help='List of architectures to care about')
parser.add_argument('--config', type=str, default='',
help='A dnf configuration to use if you do not want to use the default')
parser.add_argument('--disable-all-modules', action='store_true',
help='Disables all modules. Useful for getting newer than 8 data.')
parser.add_argument('repoids', metavar='N', type=str, nargs='+')
results = parser.parse_args()
main(results)