toolkit/mangle/ipa/ipaauditor.py

520 lines
19 KiB
Python
Raw Normal View History

2023-09-03 21:43:13 +00:00
#!/usr/bin/env python3
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
2023-11-08 07:25:28 +00:00
# pylint: disable=missing-module-docstring
2023-09-03 21:43:13 +00:00
#
2023-11-08 07:25:28 +00:00
# This script acts as an auditor for a FreeIPA domain. It's primary uses are
# for providing information of a client system and auditing policies, but can
# also be used as a simple query tool. When in audit mode, it will communicate
# with the IPA domain and attempt to get information such as HBAC and SUDO.
# Note that the auditor functionality may require an account with specialized
# privileges.
2023-09-03 21:43:13 +00:00
2023-11-08 07:25:28 +00:00
# Changelog:
# * Wed Nov 08 2023
# - Initial version
2023-09-03 21:43:13 +00:00
2023-11-08 07:25:28 +00:00
import os
import sys
import configparser
import socket
import subprocess
import argparse
2023-09-03 21:43:13 +00:00
2023-11-08 07:25:28 +00:00
################################################################################
# This is specifically for audit operations where the client system running the
# audit is not a direct client of the IPA domain. This is either used as a
# fallback if ipalib is not available or if called during "audit". This library
# is installable from EPEL (for RHEL-like machines), or the default Fedora
# repos.
2023-09-03 21:43:13 +00:00
try:
from python_freeipa import ClientMeta
except ImportError:
2023-11-08 07:25:28 +00:00
ClientMeta = None
2023-09-03 21:43:13 +00:00
2023-11-08 07:25:28 +00:00
################################################################################
# A system with the (free)ipa-client packages will have this library by
# default. If the system is enrolled, this will work without much of an issue.
# This is the default method of interacting with the API.
2023-09-03 21:43:13 +00:00
try:
2023-11-08 07:25:28 +00:00
from ipalib import api as libapi
2023-09-03 21:43:13 +00:00
except ImportError:
2023-11-08 07:25:28 +00:00
libapi = None
2023-09-03 21:43:13 +00:00
2023-11-08 07:25:28 +00:00
if not libapi and not ClientMeta:
2023-09-03 21:43:13 +00:00
print('No IPA python modules are available')
sys.exit(1)
2023-11-08 07:25:28 +00:00
################################################################################
# The argument parser should allow us to use sub parsers.
parser = argparse.ArgumentParser(
description='IPA Auditor and Infolets',
epilog='Use this with a wrapper to gather client or general IPA audit information')
subparser = parser.add_subparsers(dest='cmd')
subparser.required = True
info_parser = subparser.add_parser('info', epilog='Use this to get IPA client information.')
query_parser = subparser.add_parser('query', epilog='Use this to perform simple IPA queries.')
audit_parser = subparser.add_parser('audit', epilog='Use this to perform audits of IPA policies')
audit_parser.add_argument('--library', type=str, default='ipalib',
help='Choose the ipa library to use for the auditor',
choices=('ipalib', 'python_freeipa'))
audit_parser.add_argument('--type', type=str, required=True,
help='Type of audit: hbac, rbac, group, user',
choices=('hbac', 'rbac', 'group', 'user'))
audit_parser.add_argument('--name', type=str, default='',
help='Name of the object you want to audit')
audit_parser.add_argument('--deep', action='store_true',
help='Name of the object you want to audit')
results = parser.parse_args()
command = parser.parse_args().cmd
################################################################################
# Generic classes. These are the classes for very generic operations and
# getting information, usually about the host.
# pylint: disable=too-few-public-methods
class EtcIPADefault:
"""
Reads just the /etc/ipa/default.conf file that is generated
"""
@staticmethod
def read():
"""
Attempt to read the config file
"""
if not os.path.exists('/etc/ipa/default.conf'):
print('File does not exist (/etc/ipa/default.conf)', sys.stderr)
print('Is this system enrolled to a domain?', sys.stderr)
sys.exit(1)
__config = configparser.ConfigParser()
__config.read('/etc/ipa/default.conf')
outter_info = {}
outter_info['local_host_name'] = socket.gethostname()
outter_info['ipa_joined_name'] = __config['global']['host']
outter_info['ipa_domain'] = __config['global']['domain']
outter_info['ipa_realm'] = __config['global']['realm']
outter_info['registered_dc'] = __config['global']['server']
return outter_info
class SssctlInfo:
"""
Uses sssctl to gather minimum required information. Most current
distributions that support IPA should have this binary available.
"""
@staticmethod
def domain_status(ipa_domain):
"""
Gets the status from sssctl
"""
sssctl_cmd = f'/usr/sbin/sssctl domain-status -o {ipa_domain}'
if not os.path.exists('/usr/sbin/sssctl'):
return 'sssctl command not found'
if not os.getuid() == 0:
return 'Unknown; root required'
if sys.version_info[:2] <= (3, 6):
processor = subprocess.run(args=sssctl_cmd,
shell=True, check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
processor = subprocess.run(args=f'/usr/sbin/sssctl domain-status -o {ipa_domain}',
check=False, capture_output=True, text=True, shell=True)
domain_status_out = processor.stdout.strip().split(':')[1].strip()
return domain_status_out
@staticmethod
def current_dc(ipa_domain):
"""
Gets the current connected DC
"""
sssctl_cmd = f'/usr/sbin/sssctl domain-status -a {ipa_domain} | grep IPA'
if not os.path.exists('/usr/sbin/sssctl'):
return 'sssctl command not found'
if not os.getuid() == 0:
return 'Unknown; root required'
if sys.version_info[:2] <= (3, 6):
processor = subprocess.run(args=sssctl_cmd,
shell=True, check=False,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
# pylint: disable=line-too-long
processor = subprocess.run(args=f'/usr/sbin/sssctl domain-status -a {ipa_domain} | grep IPA',
check=False, capture_output=True, text=True, shell=True)
current_dc_out = processor.stdout.strip().split(':')[1].strip()
return current_dc_out
class IPACalls:
"""
IPA specific API calls. info, query, audit should all call in this.
"""
################################################################################
# Kerberos
@staticmethod
def quick_host_kinit():
"""
brings in the host kerb ticket
"""
etc_ipa_default = EtcIPADefault.read()
# pylint: disable=line-too-long
kinit_cmd = f"/usr/bin/kinit -kt /etc/krb5.keytab host/{etc_ipa_default['local_host_name']}@{etc_ipa_default['ipa_realm']}"
err_code = subprocess.run(args=kinit_cmd, shell=True, check=False)
if err_code.returncode != 0:
print('We could not run kdestroy on the host.', sys.stderr)
sys.exit(1)
@staticmethod
def quick_host_kdestroy():
"""
destroys the host kerb ticket
"""
etc_ipa_default = EtcIPADefault.read()
# pylint: disable=line-too-long
kdestroy_cmd = f"/usr/bin/kdestroy -p host/{etc_ipa_default['local_host_name']}@{etc_ipa_default['ipa_realm']}"
err_code = subprocess.run(args=kdestroy_cmd, shell=True, check=False)
if err_code.returncode != 0:
print('We could not run kinit as the host.', sys.stderr)
sys.exit(1)
# End kerberos
################################################################################
################################################################################
# Static IPA calls
@staticmethod
def get_host_groups(api, host):
"""
Gets the list of hostgroups this client is a part of
"""
if api:
api_results = api.host_show(host, all=True)['result']['memberof_hostgroup']
return api_results
return ['Unknown, no kerb ticket']
# End static IPA calls
################################################################################
class IPAInfo:
"""
Get IPA specific information for a client.
"""
@staticmethod
def get_basic_ipa_info(api):
"""
Gets the actual info
"""
etc_ipa_default = EtcIPADefault.read()
domain_status = SssctlInfo.domain_status(etc_ipa_default['ipa_domain'])
current_dc = SssctlInfo.current_dc(etc_ipa_default['ipa_domain'])
current_hostname = etc_ipa_default['local_host_name']
hostgroups = '\n '.join(IPACalls.get_host_groups(api, current_hostname))
output_dict = {
'Local host name:': etc_ipa_default['local_host_name'],
'Joined to domain:': etc_ipa_default['ipa_domain'],
'Joined as:': etc_ipa_default['ipa_joined_name'],
'Registered DC:': etc_ipa_default['registered_dc'],
'Current DC:': current_dc,
'Domain status:': domain_status,
'Host Group(s):': hostgroups,
}
for key, value in output_dict.items():
print(f'{key: <20}{value}')
@staticmethod
def basic_ipa_info(api):
"""
Basic IPA info (aka ipainfo).
"""
IPAInfo.get_basic_ipa_info(api)
################################################################################
# Specific classes. These are for classes with a very specific use case. Mainly
# for working with the auditor or query system.
class IPAAudit:
"""
This is for getting audit data
RBAC, HBAC. "deep" option should recurse down groups and users
"""
@staticmethod
def entry(api, control, name, deep):
"""
Gets us started on the audit
"""
if control == 'rbac':
IPAAudit.rbac_pull(api, name, deep)
@staticmethod
def user_pull(api, name, deep):
"""
Gets requested rbac info
"""
print()
@staticmethod
def group_pull(api, name, deep):
"""
Gets requested rbac info
"""
print()
@staticmethod
def hbac_pull(api, name, deep):
"""
Gets requested rbac info
"""
try:
hbac_results = IPAQuery.hbac_data(api, name)
except:
print(f'Could not find {name}', sys.stderr)
sys.exit(1)
hbac_description = '' if not hbac_results.get('description', None) else hbac_results['description'][0]
hbac_svcs = '' if not hbac_results.get('memberservice_hbacsvc', None) else '\n '.join(hbac_results['memberservice_hbacsvc'])
hbac_svcs_groups = '' if not hbac_results.get('memberservice_hbacsvcgroup', None) else '\n '.join(hbac_results['memberservice_hbacsvcgroup'])
users = '' if not hbac_results.get('memberuser_user', None) else '\n '.join(hbac_results['memberuser_user'])
hosts = '' if not hbac_results.get('memberhost_host', None) else '\n '.join(hbac_results['memberhost_host'])
groups = '' if not hbac_results.get('memberuser_group', None) else '\n '.join(hbac_results['memberuser_group'])
hostgroups = '' if not hbac_results.get('memberhost_hostgroup', None) else '\n '.join(hbac_results['memberhost_hostgroup'])
@staticmethod
def rbac_pull(api, name, deep):
"""
Gets requested rbac info
"""
try:
rbac_results = IPAQuery.role_data(api, name)
except:
print(f'Could not find {name}', sys.stderr)
sys.exit(1)
# pylint: disable=line-too-long
rbac_description = '' if not rbac_results.get('description', None) else rbac_results['description'][0]
rbac_privs = '' if not rbac_results.get('memberof_privilege', None) else '\n '.join(rbac_results['memberof_privilege'])
users = '' if not rbac_results.get('member_user', None) else '\n '.join(rbac_results['member_user'])
hosts = '' if not rbac_results.get('member_host', None) else '\n '.join(rbac_results['member_host'])
groups = '' if not rbac_results.get('member_group', None) else '\n '.join(rbac_results['member_group'])
hostgroups = '' if not rbac_results.get('member_hostgroup', None) else '\n '.join(rbac_results['member_hostgroup'])
starter_rbac = {
'Role Name:': name,
'Description:': rbac_description,
'Privileges:': rbac_privs,
'Users:': users,
'Groups:': groups,
'Hosts:': hosts,
'Hosts Groups:': hostgroups,
}
print('RBAC Information')
print('----------------------------------------')
for key, value in starter_rbac.items():
if len(value) > 0:
print(f'{key: <16}{value}')
print('')
if deep:
user_list = [] if not rbac_results.get('member_user', None) else rbac_results['member_user']
group_list = [] if not rbac_results.get('member_group', None) else rbac_results['member_group']
priv_list = [] if not rbac_results.get('memberof_privilege', None) else rbac_results['memberof_privilege']
IPAAudit.role_deep_list(api, user_list, group_list, priv_list)
@staticmethod
# pylint: disable=dangerous-default-value
def hbac_deep_list(api, users, groups, privs):
"""
Does recursive digging on the control provided
"""
print()
@staticmethod
# pylint: disable=dangerous-default-value
def role_deep_list(api, users, groups, privs):
"""
Does recursive digging on the users provided
"""
starting_perms = []
for priv in privs:
data = IPAQuery.priv_data(api, priv)
description = '' if not data.get('description', None) else data['description'][0]
permlist = '' if not data.get('memberof_permission', None) else data['memberof_permission']
if len(permlist) > 0:
for perm in permlist:
if perm not in starting_perms:
starting_perms.append(perm)
print(f'Permissions Applied to this Role')
print('----------------------------------------')
for item in starting_perms:
print(item)
print('')
starting_user_list = users
for group in groups:
data = IPAQuery.group_data(api, group)
description = '' if not data.get('description', None) else data['description'][0]
userlist = '' if not data.get('member_user', None) else data['member_user']
ind_userlist = '' if not data.get('memberindirect_user', None) else data['memberindirect_user']
grouplist = '' if not data.get('member_group', None) else '\n '.join(data['member_group'])
user_list = []
ind_user_list = []
if len(userlist) > 0:
for user in userlist:
user_list.append(f'{user}')
if user not in starting_user_list:
starting_user_list.append(user)
if len(ind_userlist) > 0:
for user in ind_userlist:
ind_user_list.append(f'{user}')
if user not in starting_user_list:
starting_user_list.append(user)
user_list_join = '\n '.join(user_list)
ind_user_list_join = '\n '.join(ind_user_list)
group_dict = {
'Description:': description,
'Users:': user_list_join,
'Indirect Users:': ind_user_list_join,
'Groups:': grouplist,
}
print(f'Group: {group}')
print('----------------------------------------')
for key, value in group_dict.items():
if len(value) > 0:
print(f'{key: <16}{value}')
print('')
final_user_list = {}
for user in starting_user_list:
data = IPAQuery.user_data(api, user)
this_user = IPAQuery.user_data(api, user)
fullname = f"{this_user['givenname'][0]} {this_user['sn'][0]}"
final_user_list[user] = fullname
if len(starting_user_list) > 0:
print('Full List of Users for this role')
print('----------------------------------------')
for key, value in final_user_list.items():
if len(value) > 0:
print(f'{key: <24}{value}')
@staticmethod
def user_deep_list(api, user):
"""
Does a recursive dig on a user
"""
@staticmethod
def group_deep_list(api, group):
"""
Does a recursive dig on a group
"""
class IPAQuery:
"""
This is for getting query data
"""
@staticmethod
def user_data(api, user):
"""
Returns all user data
"""
return api.user_show(user)['result']
@staticmethod
def group_data(api, group):
"""
Returns all group data
"""
return api.group_show(group)['result']
@staticmethod
def hostgroup_data(api, group):
"""
Returns all group data
"""
return api.hostgroup_show(group)['result']
@staticmethod
def role_data(api, role):
"""
Returns all role data
"""
return api.role_show(role)['result']
@staticmethod
def priv_data(api, priv):
"""
Returns all role data
"""
return api.privilege_show(priv)['result']
@staticmethod
def hbac_data(api, hbac):
"""
Returns all hbac data
"""
return api.hbacrule_show(hbac)['result']
@staticmethod
def hbacsvcgroup_data(api, hbacsvcgroup):
"""
Returns all hbac service group data
"""
return api.hbacsvcgroup_show(hbacsvcgroup)['result']
# start main
def get_api(ipa_library='ipalib'):
"""
Gets and returns the right API entrypoint
"""
# This is unfortunately hacky.
if ipa_library == 'ipalib':
# When root, use the hostkeytab
if os.getuid() == 0:
IPACalls.quick_host_kinit()
api = libapi
try:
api.bootstrap(context="custom")
api.finalize()
api.Backend.rpcclient.connect()
command_api = api.Command
except:
print('WARNING: No kerberos credentials\n')
command_api = None
elif ipa_library == 'python_freeipa':
print()
else:
print('Unsupported ipa library', sys.stderr)
sys.exit(1)
return command_api
def main():
"""
Main function entrypoint
"""
command_api = get_api()
if command == 'audit':
IPAAudit.entry(command_api, results.type, results.name, results.deep)
elif command == 'info':
IPAInfo.basic_ipa_info(command_api)
# When root, kdestroy the host keytab
if os.getuid() == 0:
IPACalls.quick_host_kdestroy()
if __name__ == '__main__':
main()