move to use elementtree

This commit is contained in:
Louis Abel 2023-09-07 02:12:08 -07:00
parent 98a1737764
commit 21f8ced3dc
Signed by: label
GPG Key ID: B37E62D143879B36

View File

@ -8,10 +8,15 @@ import os
import argparse import argparse
import time import time
import binascii import binascii
# The old yum-utils repo-rss used string manipulation. We're instead going to
# use the XML python library to do the work for us. This is cleaner, imo.
from xml.sax.saxutils import escape as xmlescape
from xml.etree.ElementTree import ElementTree, TreeBuilder, tostring
from xml.dom import minidom
import dnf import dnf
import dnf.exceptions import dnf.exceptions
#from dnf.comps import Comps #from dnf.comps import Comps
import libxml2 #import libxml2
def to_unicode(string: str) -> str: def to_unicode(string: str) -> str:
""" """
@ -43,6 +48,8 @@ class DnfQuiet(dnf.Base):
else: else:
available = self.sack.query().available().filter(latest_per_arch=1) available = self.sack.query().available().filter(latest_per_arch=1)
available.run()
for package in available: for package in available:
ftime = int(package.buildtime) ftime = int(package.buildtime)
if ftime > recentlimit: if ftime > recentlimit:
@ -60,45 +67,58 @@ class DnfQuiet(dnf.Base):
class RepoRSS: class RepoRSS:
def __init__(self, filename='repo-rss.xml'): def __init__(self, filename='repo-rss.xml'):
self.description = 'Repository RSS' self.description = 'Repository RSS'
self.link = 'http://dnf.baseurl.org' self.link = 'https://github.com/rpm-software-management/dnf'
self.title = 'Recent Packages' self.title = 'Recent Packages'
self.do_file(filename)
self.do_doc()
def do_doc(self):
self.doc = libxml2.newDoc('1.0')
self.xmlescape = self.doc.encodeEntitiesReentrant
rss = self.doc.newChild(None, 'rss', None)
rss.setProp('version', '2.0')
self.rssnode = rss.newChild(None, 'channel', None)
def do_file(self, filename):
if filename[0] != '/': if filename[0] != '/':
cwd = os.getcwd() cwd = os.getcwd()
self.filename = os.path.join(cwd, filename) self.filename = os.path.join(cwd, filename)
else: else:
self.filename = filename self.filename = filename
try: def rsspackage(self, packages):
self.file_open = open(self.filename, 'w+') file = self.filename
except IOError as exc: rfc822_format = "%a, %d %b %Y %X GMT"
print(f'Error opening file {self.filename}: {exc}', file=sys.stderr) now = time.strftime(rfc822_format, time.gmtime())
sys.exit(1) etbobj = TreeBuilder()
# start rss
etbobj.start('rss', {'version': '2.0'})
# start channel
etbobj.start('channel', {})
# start title
etbobj.start('title', {})
etbobj.data(self.title)
etbobj.end('title')
# end title
# start link
etbobj.start('link', {})
etbobj.data(self.link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
etbobj.data(self.description)
etbobj.end('description')
# end description
# start pubDate
etbobj.start('pubDate', {})
etbobj.data(now)
etbobj.end('pubDate')
# end pubDate
# start generator
etbobj.start('generator', {})
etbobj.data('DNF')
etbobj.end('generator')
# end generator
def rsspackage(self, package):
rfc822_format = "%a, %d %b %Y %X GMT" rfc822_format = "%a, %d %b %Y %X GMT"
changelog_format = "%a, %d %b %Y GMT" changelog_format = "%a, %d %b %Y GMT"
for package in packages:
package_hex = binascii.hexlify(package.chksum[1]).decode() package_hex = binascii.hexlify(package.chksum[1]).decode()
item = self.rssnode.newChild(None, 'item', None) title = xmlescape(str(package))
title = self.xmlescape(str(package))
description = package.description
item.newChild(None, 'title', title)
date = time.gmtime(float(package.buildtime)) date = time.gmtime(float(package.buildtime))
item.newChild(None, 'pubDate', time.strftime(rfc822_format, date)) description = package.description
# pylint: disable=line-too-long
item.newChild(None, 'guid', package_hex).setProp("isPermaLink", "false")
link = package.remote_location() link = package.remote_location()
item.newChild(None, 'link', self.xmlescape(link)) # form description
changelog = '' changelog = ''
count = 0 count = 0
if package.changelogs is not None: if package.changelogs is not None:
@ -114,54 +134,59 @@ class RepoRSS:
author = meta['author'] author = meta['author']
desc = meta['text'] desc = meta['text']
changelog += f'{date} - {author}\n{desc}\n\n' changelog += f'{date} - {author}\n{desc}\n\n'
# pylint: disable=line-too-long,consider-using-f-string description = f'<p><strong>{package.name}</strong> - {package.summary}</p>\n\n'
description = '<p><strong>{}</strong> - {}</p>\n\n'.format(self.xmlescape(package.name), self.xmlescape(package.summary)) description += '<p>%s</p>\n\n<p><strong>Change Log:</strong></p>\n\n' % description.replace("\n", "<br />\n")
description += '<p>%s</p>\n\n<p><strong>Change Log:</strong></p>\n\n' % self.xmlescape(description.replace("\n", "<br />\n")) description += f'<pre>{changelog}</pre>'
description += self.xmlescape('<pre>%s</pre>' % self.xmlescape(changelog))
item.newChild(None, 'description', description)
return item
def start_rss(self): # start item
"""return string representation of rss preamble""" etbobj.start('item', {})
rfc822_format = "%a, %d %b %Y %X GMT" # start title
now = time.strftime(rfc822_format, time.gmtime()) etbobj.start('title', {})
rssheader = f"""<?xml version="1.0" encoding="utf-8"?> etbobj.data(title)
<rss version="2.0"> etbobj.end('title')
<channel> # end title
<title>{self.title}</title> # start pubDate
<link>{self.link}</link> etbobj.start('pubDate', {})
<description>{self.description}</description> etbobj.data(date)
<pubDate>{now}</pubDate> etbobj.end('pubDate')
<generator>DNF</generator> # end pubDate
""" # start guid
etbobj.start('guid', {'isPermaLink': 'false'})
etbobj.data(package_hex)
etbobj.end('guid',)
# end guid
# start link
etbobj.start('link', {})
etbobj.data(link)
etbobj.end('link')
# end link
# start description
etbobj.start('description', {})
etbobj.data(xmlescape(description))
etbobj.end('description')
# end description
etbobj.end('item')
# end item
self.file_open.write(rssheader) etbobj.end('channel')
# end channel
etbobj.end('rss')
# end rss
rss = etbobj.close()
etree = ElementTree(rss)
some_string = tostring(etree.getroot(), encoding='utf-8')
xmlstr = minidom.parseString(some_string).toprettyxml(indent=" ")
#etree.write(file, encoding='utf-8')
with open(file, 'w+', encoding='utf-8') as f:
f.write(xmlstr)
f.close()
def do_package(self, package): def make_rss_feed(filename, title, link, description, recent):
item = self.rsspackage(package)
self.file_open.write(item.serialize("utf-8", 1))
item.unlinkNode()
item.freeNode()
del item
def close_rss(self):
end="\n </channel>\n</rss>\n"
self.file_open.write(end)
self.file_open.close()
del self.file_open
self.doc.freeDoc()
del self.doc
def make_rss_feed(filename, title, link, description, recent, dnfobj):
rssobj = RepoRSS(filename) rssobj = RepoRSS(filename)
rssobj.title = title rssobj.title = title
rssobj.link = link rssobj.link = link
rssobj.description = description rssobj.description = description
rssobj.start_rss() rssobj.rsspackage(recent)
if len(recent) > 0:
for package in recent:
rssobj.do_package(package)
rssobj.close_rss()
def main(options): def main(options):
days = options.days days = options.days
@ -198,7 +223,7 @@ def main(options):
repoobj.load_metadata_other = True repoobj.load_metadata_other = True
print('Getting repo data') print('Getting repo data for requested repos')
try: try:
dnfobj.fill_sack() dnfobj.fill_sack()
except: except:
@ -206,10 +231,13 @@ def main(options):
sys.exit(1) sys.exit(1)
sack_query = dnfobj.sack.query().available() sack_query = dnfobj.sack.query().available()
recent = sack_query.filter(latest_per_arch=1) #recent = sack_query.filter(latest_per_arch=1)
sorted_recents = sorted(set(recent.run()), key=lambda pkg: pkg.buildtime) recent = dnfobj.get_recent(days=days)
#sorted_recents = sorted(set(recent.run()), key=lambda pkg: pkg.buildtime)
sorted_recents = sorted(set(recent), key=lambda pkg: pkg.buildtime)
sorted_recents.reverse() sorted_recents.reverse()
make_rss_feed(options.filename, options.title, options.link, options.description, sorted_recents, dnfobj) make_rss_feed(options.filename, options.title, options.link,
options.description, sorted_recents)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()