ansible-ops-management/files/usr/libexec/rocky/rss.py

313 lines
11 KiB
Python
Raw Permalink Normal View History

2023-09-05 08:05:58 +00:00
#!/usr/bin/env python3
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
# modified version of repo-rss from yum utils
2023-09-12 08:17:38 +00:00
# changelog
# -> 20230912: do not xmlescape entire description variable
2023-09-05 08:05:58 +00:00
import sys
import os
2023-09-13 08:39:33 +00:00
import re
2023-09-05 08:05:58 +00:00
import argparse
import time
import binascii
2023-09-13 08:39:33 +00:00
import base64
2023-09-07 09:20:00 +00:00
# The old yum-utils repo-rss used string manipulation. We're instead going to
# use the XML python library to do the work for us. This is cleaner, imo.
from xml.sax.saxutils import escape as xmlescape
from xml.etree.ElementTree import ElementTree, TreeBuilder, tostring
from xml.dom import minidom
2023-09-05 08:05:58 +00:00
import dnf
import dnf.exceptions
#from dnf.comps import Comps
2023-09-07 09:20:00 +00:00
#import libxml2
2023-09-05 08:05:58 +00:00
def to_unicode(string: str) -> str:
"""
Convert to unicode
"""
if isinstance(string, bytes):
return string.decode('utf8')
if isinstance(string, str):
return string
return str(string)
2023-09-13 08:39:33 +00:00
def to_base64(string: str) -> str:
"""
Converts a string to base64, but we put single quotes around it. This makes
it easier to regex the value.
"""
string_bytes = string.encode('utf-8')
string_conv = base64.b64encode(string_bytes)
base64_str = "'" + string_conv.decode('utf-8') + "'"
return str(base64_str)
def from_base64(string: str) -> str:
"""
Takes a base64 value and returns a string. We also strip off any single
quotes that can happen.
"""
stripped = string.replace("'", "")
conv_bytes = stripped.encode('utf-8')
convd_bytes = base64.b64decode(conv_bytes)
decoded = convd_bytes.decode('utf-8')
return decoded
2023-09-05 08:05:58 +00:00
class DnfQuiet(dnf.Base):
"""
DNF object
"""
def __init__(self):
dnf.Base.__init__(self)
def get_recent(self, days=1):
"""
Return most recent packages from dnf sack
"""
recent = []
now = time.time()
recentlimit = now-(days*86400)
ftimehash = {}
if self.conf.showdupesfromrepos:
available = self.sack.query().available().filter()
else:
available = self.sack.query().available().filter(latest_per_arch=1)
2023-09-07 09:20:00 +00:00
available.run()
2023-09-05 08:05:58 +00:00
for package in available:
ftime = int(package.buildtime)
if ftime > recentlimit:
if ftime not in ftimehash:
ftimehash[ftime] = [package]
else:
ftimehash[ftime].append(package)
for sometime in ftimehash.keys():
for package in ftimehash[sometime]:
recent.append(package)
return recent
class RepoRSS:
def __init__(self, filename='repo-rss.xml'):
self.description = 'Repository RSS'
2023-09-07 09:20:00 +00:00
self.link = 'https://github.com/rpm-software-management/dnf'
2023-09-05 08:05:58 +00:00
self.title = 'Recent Packages'
if filename[0] != '/':
cwd = os.getcwd()
self.filename = os.path.join(cwd, filename)
else:
self.filename = filename
2023-09-07 09:20:00 +00:00
def rsspackage(self, packages):
file = self.filename
2023-09-05 08:05:58 +00:00
rfc822_format = "%a, %d %b %Y %X GMT"
now = time.strftime(rfc822_format, time.gmtime())
2023-09-07 09:20:00 +00:00
etbobj = TreeBuilder()
# start rss
etbobj.start('rss', {'version': '2.0'})
# start channel
etbobj.start('channel', {})
# start title
etbobj.start('title', {})
etbobj.data(self.title)
etbobj.end('title')
# end title
# start link
etbobj.start('link', {})
etbobj.data(self.link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
etbobj.data(self.description)
etbobj.end('description')
# end description
# start pubDate
etbobj.start('pubDate', {})
etbobj.data(now)
etbobj.end('pubDate')
# end pubDate
# start generator
etbobj.start('generator', {})
etbobj.data('DNF')
etbobj.end('generator')
# end generator
2023-09-05 08:05:58 +00:00
2023-09-07 09:20:00 +00:00
changelog_format = "%a, %d %b %Y GMT"
for package in packages:
package_hex = binascii.hexlify(package.chksum[1]).decode()
title = xmlescape(str(package))
date = time.gmtime(float(package.buildtime))
2023-09-12 10:25:53 +00:00
pkg_description = package.description
# package.description is sometimes a NoneType. Don't know why.
if not pkg_description:
pkg_description = ''
2023-09-12 08:17:38 +00:00
link = xmlescape(package.remote_location())
2023-09-07 09:20:00 +00:00
# form description
changelog = ''
count = 0
if package.changelogs is not None:
changelog_list = package.changelogs
else:
changelog_list = []
for meta in changelog_list:
count += 1
if count > 3:
changelog += '...'
break
2023-09-07 09:55:54 +00:00
cl_date = meta['timestamp'].strftime(changelog_format)
2023-09-07 09:20:00 +00:00
author = meta['author']
desc = meta['text']
2023-09-07 09:55:54 +00:00
changelog += f'{cl_date} - {author}\n{desc}\n\n'
2023-09-21 00:07:59 +00:00
description = '<p><strong>{}</strong> - {}{}</p>\n\n'.format(xmlescape(package.name), xmlescape(package.summary), xmlescape("<br />"))
2023-09-12 10:25:53 +00:00
description += '<p>%s</p>\n\n<p><strong>Change Log:</strong></p>\n\n' % xmlescape(to_unicode(pkg_description.replace("\n", "<br />\n")))
description += xmlescape('<pre>{}</pre>'.format(xmlescape(to_unicode(changelog))))
2023-09-13 08:39:33 +00:00
base64_description = to_base64(description)
2023-09-07 09:20:00 +00:00
# start item
etbobj.start('item', {})
# start title
etbobj.start('title', {})
etbobj.data(title)
etbobj.end('title')
# end title
# start pubDate
etbobj.start('pubDate', {})
2023-09-07 09:55:54 +00:00
etbobj.data(time.strftime(rfc822_format, date))
2023-09-07 09:20:00 +00:00
etbobj.end('pubDate')
# end pubDate
# start guid
etbobj.start('guid', {'isPermaLink': 'false'})
etbobj.data(package_hex)
etbobj.end('guid',)
# end guid
# start link
etbobj.start('link', {})
etbobj.data(link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
2023-09-13 08:39:33 +00:00
etbobj.data(base64_description)
2023-09-07 09:20:00 +00:00
etbobj.end('description')
# end description
etbobj.end('item')
# end item
etbobj.end('channel')
# end channel
etbobj.end('rss')
# end rss
rss = etbobj.close()
etree = ElementTree(rss)
some_string = tostring(etree.getroot(), encoding='utf-8')
xmlstr = minidom.parseString(some_string).toprettyxml(indent=" ")
2023-09-13 08:39:33 +00:00
# When writing to the file, we split the string by the newlines. This
# appears as a list. We loop through the list and find <description>',
# the reason is because we did a base64 encoding of the package
# description to keep the etree from encoding the HTML. We decode it
# and then write it back, along with everything else line by line. This
# is very inefficient, but as far as I can tell, there's no way with
# the built in xml library in python to keep it from doing this.
base64_regex = r"'(.*)'"
with open(f'{file}', 'w+', encoding='utf-8') as f:
for line in xmlstr.splitlines():
new_line = line
if "<description>'" in line:
result = re.search(base64_regex, line)
record = from_base64(result.group(0))
new_line = line.replace(result.group(0), record)
f.write(new_line + '\n')
2023-09-07 09:20:00 +00:00
f.close()
def make_rss_feed(filename, title, link, description, recent):
2023-09-05 08:05:58 +00:00
rssobj = RepoRSS(filename)
rssobj.title = title
rssobj.link = link
rssobj.description = description
2023-09-07 09:20:00 +00:00
rssobj.rsspackage(recent)
2023-09-05 08:05:58 +00:00
def main(options):
days = options.days
repoids = options.repoids
dnfobj = DnfQuiet()
if options.config:
dnfobj.conf.read(filename=options.config)
if os.geteuid() != 0 or options.tempcache:
cachedir = dnfobj.conf.cachedir
if cachedir is None:
print('Error: Could not make cachedir')
sys.exit(50)
dnfobj.conf.cachedir = cachedir
try:
dnfobj.read_all_repos()
except:
print('Could not read repos', file=sys.stderr)
sys.exit(1)
if len(repoids) > 0:
for repo in dnfobj.repos:
repoobj = dnfobj.repos[repo]
if repo not in repoids:
repoobj.disable()
else:
repoobj.enable()
2023-09-05 09:24:31 +00:00
if options.module_hotfixes:
2023-09-05 09:35:07 +00:00
try:
repoobj.set_or_append_opt_value('module_hotfixes', '1')
except:
print('Warning: dnf library is too old to support setting values')
2023-09-05 08:05:58 +00:00
repoobj.load_metadata_other = True
2023-09-07 09:20:00 +00:00
print('Getting repo data for requested repos')
2023-09-05 08:05:58 +00:00
try:
dnfobj.fill_sack()
except:
print('repo data failure')
sys.exit(1)
2023-09-24 14:25:50 +00:00
if options.disable_all_modules:
modobj = dnf.module.module_base.ModuleBase(dnfobj)
modobj.disable(['*'])
2023-09-05 08:05:58 +00:00
sack_query = dnfobj.sack.query().available()
2023-09-07 09:20:00 +00:00
#recent = sack_query.filter(latest_per_arch=1)
recent = dnfobj.get_recent(days=days)
#sorted_recents = sorted(set(recent.run()), key=lambda pkg: pkg.buildtime)
sorted_recents = sorted(set(recent), key=lambda pkg: pkg.buildtime)
2023-09-05 08:05:58 +00:00
sorted_recents.reverse()
2023-09-07 09:20:00 +00:00
make_rss_feed(options.filename, options.title, options.link,
options.description, sorted_recents)
2023-09-05 08:05:58 +00:00
if __name__ == "__main__":
parser = argparse.ArgumentParser()
2023-09-05 09:24:31 +00:00
parser.add_argument('--filename', type=str, default='repo-rss.xml',
2023-09-05 09:31:03 +00:00
help='File patch to export to')
2023-09-07 01:26:19 +00:00
parser.add_argument('--link', type=str, default='https://github.com/rpm-software-management/dnf',
2023-09-05 09:31:03 +00:00
help='URL link to repository root')
2023-09-05 09:24:31 +00:00
parser.add_argument('--title', type=str, default='RSS Repository - Recent Packages',
2023-09-05 09:31:03 +00:00
help='Title of the feed')
2023-09-05 09:24:31 +00:00
parser.add_argument('--description', type=str, default='Most recent packages in Repositories',
2023-09-05 09:31:03 +00:00
help='Description of the feed')
parser.add_argument('--days', type=int, default=7, help='Number of days to look back')
2023-09-05 09:24:31 +00:00
parser.add_argument('--tempcache', action='store_true',
2023-09-05 09:31:03 +00:00
help='Temporary cache location (automatically on if not root)')
2023-09-05 09:24:31 +00:00
parser.add_argument('--module-hotfixes', action='store_true',
2023-09-05 09:31:03 +00:00
help='Only use this to catch all module packages')
2023-09-05 09:24:31 +00:00
parser.add_argument('--arches', action='append', default=[],
2023-09-05 09:31:03 +00:00
help='List of architectures to care about')
2023-09-05 09:24:31 +00:00
parser.add_argument('--config', type=str, default='',
2023-09-05 09:31:03 +00:00
help='A dnf configuration to use if you do not want to use the default')
2023-09-24 14:25:50 +00:00
parser.add_argument('--disable-all-modules', action='store_true',
help='Disables all modules. Useful for getting newer than 8 data.')
2023-09-05 08:05:58 +00:00
parser.add_argument('repoids', metavar='N', type=str, nargs='+')
results = parser.parse_args()
main(results)