From ce533823bef919730c21b8cf2ec0531ab84ee1a7 Mon Sep 17 00:00:00 2001 From: Louis Abel Date: Wed, 8 Nov 2023 00:25:28 -0700 Subject: [PATCH] add IPA info tools --- mangle/ipa/ipaaudit | 25 ++ mangle/ipa/ipaauditor.py | 513 +++++++++++++++++++++++++++++++++++++- mangle/ipa/ipainfo | 26 ++ mangle/ipa/ipainfo-legacy | 14 ++ mangle/ipa/ipainfo.py | 26 +- mangle/ipa/ipaquery | 25 ++ 6 files changed, 610 insertions(+), 19 deletions(-) create mode 100755 mangle/ipa/ipaaudit create mode 100755 mangle/ipa/ipainfo create mode 100644 mangle/ipa/ipainfo-legacy create mode 100755 mangle/ipa/ipaquery diff --git a/mangle/ipa/ipaaudit b/mangle/ipa/ipaaudit new file mode 100755 index 0000000..bbb4320 --- /dev/null +++ b/mangle/ipa/ipaaudit @@ -0,0 +1,25 @@ +#!/bin/bash +# Wrapper for ipaauditor.py audit + +case "$ID" in + rocky|centos|rhel) + case "${VERSION_ID:0:1}" in + 5|6|7) + echo "Not supported." + exit 3 + ;; + 8) + PYTHON_EXEC="/usr/libexec/platform-python" + ;; + *) + PYTHON_EXEC="/usr/bin/python3" + ;; + esac ;; + ubuntu|debian) + PYTHON_EXEC="/usr/bin/python3" + ;; + fedora) + PYTHON_EXEC="/usr/bin/python3" +esac + +$PYTHON_EXEC ipaauditor.py audit "$@" diff --git a/mangle/ipa/ipaauditor.py b/mangle/ipa/ipaauditor.py index 376f808..9c2d6ab 100644 --- a/mangle/ipa/ipaauditor.py +++ b/mangle/ipa/ipaauditor.py @@ -1,26 +1,519 @@ #!/usr/bin/env python3 # -*-:python; coding:utf-8; -*- # author: Louis Abel +# pylint: disable=missing-module-docstring # -# This script acts as a auditor for a FreeIPA domain. By default, it will -# communicate with an IPA server of a domain, login, and attempt to get all -# information for HBAC and SUDO. +# This script acts as an auditor for a FreeIPA domain. It's primary uses are +# for providing information of a client system and auditing policies, but can +# also be used as a simple query tool. When in audit mode, it will communicate +# with the IPA domain and attempt to get information such as HBAC and SUDO. +# Note that the auditor functionality may require an account with specialized +# privileges. +# Changelog: +# * Wed Nov 08 2023 +# - Initial version + +import os import sys +import configparser +import socket +import subprocess +import argparse -python_freeipa = True -ipalib = True - +################################################################################ +# This is specifically for audit operations where the client system running the +# audit is not a direct client of the IPA domain. This is either used as a +# fallback if ipalib is not available or if called during "audit". This library +# is installable from EPEL (for RHEL-like machines), or the default Fedora +# repos. try: from python_freeipa import ClientMeta except ImportError: - python_freeipa = None + ClientMeta = None +################################################################################ +# A system with the (free)ipa-client packages will have this library by +# default. If the system is enrolled, this will work without much of an issue. +# This is the default method of interacting with the API. try: - from ipalib import api + from ipalib import api as libapi except ImportError: - ipalib = None + libapi = None -if not ipalib and not python_freeipa: +if not libapi and not ClientMeta: print('No IPA python modules are available') sys.exit(1) + +################################################################################ +# The argument parser should allow us to use sub parsers. +parser = argparse.ArgumentParser( + description='IPA Auditor and Infolets', + epilog='Use this with a wrapper to gather client or general IPA audit information') +subparser = parser.add_subparsers(dest='cmd') +subparser.required = True +info_parser = subparser.add_parser('info', epilog='Use this to get IPA client information.') +query_parser = subparser.add_parser('query', epilog='Use this to perform simple IPA queries.') +audit_parser = subparser.add_parser('audit', epilog='Use this to perform audits of IPA policies') + +audit_parser.add_argument('--library', type=str, default='ipalib', + help='Choose the ipa library to use for the auditor', + choices=('ipalib', 'python_freeipa')) +audit_parser.add_argument('--type', type=str, required=True, + help='Type of audit: hbac, rbac, group, user', + choices=('hbac', 'rbac', 'group', 'user')) +audit_parser.add_argument('--name', type=str, default='', + help='Name of the object you want to audit') +audit_parser.add_argument('--deep', action='store_true', + help='Name of the object you want to audit') + +results = parser.parse_args() +command = parser.parse_args().cmd + +################################################################################ +# Generic classes. These are the classes for very generic operations and +# getting information, usually about the host. +# pylint: disable=too-few-public-methods +class EtcIPADefault: + """ + Reads just the /etc/ipa/default.conf file that is generated + """ + @staticmethod + def read(): + """ + Attempt to read the config file + """ + if not os.path.exists('/etc/ipa/default.conf'): + print('File does not exist (/etc/ipa/default.conf)', sys.stderr) + print('Is this system enrolled to a domain?', sys.stderr) + sys.exit(1) + + __config = configparser.ConfigParser() + __config.read('/etc/ipa/default.conf') + outter_info = {} + outter_info['local_host_name'] = socket.gethostname() + outter_info['ipa_joined_name'] = __config['global']['host'] + outter_info['ipa_domain'] = __config['global']['domain'] + outter_info['ipa_realm'] = __config['global']['realm'] + outter_info['registered_dc'] = __config['global']['server'] + return outter_info + +class SssctlInfo: + """ + Uses sssctl to gather minimum required information. Most current + distributions that support IPA should have this binary available. + """ + @staticmethod + def domain_status(ipa_domain): + """ + Gets the status from sssctl + """ + sssctl_cmd = f'/usr/sbin/sssctl domain-status -o {ipa_domain}' + if not os.path.exists('/usr/sbin/sssctl'): + return 'sssctl command not found' + + if not os.getuid() == 0: + return 'Unknown; root required' + + if sys.version_info[:2] <= (3, 6): + processor = subprocess.run(args=sssctl_cmd, + shell=True, check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + else: + processor = subprocess.run(args=f'/usr/sbin/sssctl domain-status -o {ipa_domain}', + check=False, capture_output=True, text=True, shell=True) + + domain_status_out = processor.stdout.strip().split(':')[1].strip() + return domain_status_out + + @staticmethod + def current_dc(ipa_domain): + """ + Gets the current connected DC + """ + sssctl_cmd = f'/usr/sbin/sssctl domain-status -a {ipa_domain} | grep IPA' + if not os.path.exists('/usr/sbin/sssctl'): + return 'sssctl command not found' + + if not os.getuid() == 0: + return 'Unknown; root required' + + if sys.version_info[:2] <= (3, 6): + processor = subprocess.run(args=sssctl_cmd, + shell=True, check=False, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + else: + # pylint: disable=line-too-long + processor = subprocess.run(args=f'/usr/sbin/sssctl domain-status -a {ipa_domain} | grep IPA', + check=False, capture_output=True, text=True, shell=True) + + current_dc_out = processor.stdout.strip().split(':')[1].strip() + return current_dc_out + +class IPACalls: + """ + IPA specific API calls. info, query, audit should all call in this. + """ + ################################################################################ + # Kerberos + @staticmethod + def quick_host_kinit(): + """ + brings in the host kerb ticket + """ + etc_ipa_default = EtcIPADefault.read() + # pylint: disable=line-too-long + kinit_cmd = f"/usr/bin/kinit -kt /etc/krb5.keytab host/{etc_ipa_default['local_host_name']}@{etc_ipa_default['ipa_realm']}" + err_code = subprocess.run(args=kinit_cmd, shell=True, check=False) + if err_code.returncode != 0: + print('We could not run kdestroy on the host.', sys.stderr) + sys.exit(1) + + @staticmethod + def quick_host_kdestroy(): + """ + destroys the host kerb ticket + """ + etc_ipa_default = EtcIPADefault.read() + # pylint: disable=line-too-long + kdestroy_cmd = f"/usr/bin/kdestroy -p host/{etc_ipa_default['local_host_name']}@{etc_ipa_default['ipa_realm']}" + err_code = subprocess.run(args=kdestroy_cmd, shell=True, check=False) + if err_code.returncode != 0: + print('We could not run kinit as the host.', sys.stderr) + sys.exit(1) + + # End kerberos + ################################################################################ + + ################################################################################ + # Static IPA calls + @staticmethod + def get_host_groups(api, host): + """ + Gets the list of hostgroups this client is a part of + """ + if api: + api_results = api.host_show(host, all=True)['result']['memberof_hostgroup'] + return api_results + return ['Unknown, no kerb ticket'] + + # End static IPA calls + ################################################################################ + +class IPAInfo: + """ + Get IPA specific information for a client. + """ + @staticmethod + def get_basic_ipa_info(api): + """ + Gets the actual info + """ + etc_ipa_default = EtcIPADefault.read() + domain_status = SssctlInfo.domain_status(etc_ipa_default['ipa_domain']) + current_dc = SssctlInfo.current_dc(etc_ipa_default['ipa_domain']) + current_hostname = etc_ipa_default['local_host_name'] + hostgroups = '\n '.join(IPACalls.get_host_groups(api, current_hostname)) + output_dict = { + 'Local host name:': etc_ipa_default['local_host_name'], + 'Joined to domain:': etc_ipa_default['ipa_domain'], + 'Joined as:': etc_ipa_default['ipa_joined_name'], + 'Registered DC:': etc_ipa_default['registered_dc'], + 'Current DC:': current_dc, + 'Domain status:': domain_status, + 'Host Group(s):': hostgroups, + } + for key, value in output_dict.items(): + print(f'{key: <20}{value}') + + @staticmethod + def basic_ipa_info(api): + """ + Basic IPA info (aka ipainfo). + """ + IPAInfo.get_basic_ipa_info(api) + +################################################################################ +# Specific classes. These are for classes with a very specific use case. Mainly +# for working with the auditor or query system. +class IPAAudit: + """ + This is for getting audit data + + RBAC, HBAC. "deep" option should recurse down groups and users + """ + @staticmethod + def entry(api, control, name, deep): + """ + Gets us started on the audit + """ + if control == 'rbac': + IPAAudit.rbac_pull(api, name, deep) + + @staticmethod + def user_pull(api, name, deep): + """ + Gets requested rbac info + """ + print() + + @staticmethod + def group_pull(api, name, deep): + """ + Gets requested rbac info + """ + print() + + @staticmethod + def hbac_pull(api, name, deep): + """ + Gets requested rbac info + """ + try: + hbac_results = IPAQuery.hbac_data(api, name) + except: + print(f'Could not find {name}', sys.stderr) + sys.exit(1) + + hbac_description = '' if not hbac_results.get('description', None) else hbac_results['description'][0] + hbac_svcs = '' if not hbac_results.get('memberservice_hbacsvc', None) else '\n '.join(hbac_results['memberservice_hbacsvc']) + hbac_svcs_groups = '' if not hbac_results.get('memberservice_hbacsvcgroup', None) else '\n '.join(hbac_results['memberservice_hbacsvcgroup']) + users = '' if not hbac_results.get('memberuser_user', None) else '\n '.join(hbac_results['memberuser_user']) + hosts = '' if not hbac_results.get('memberhost_host', None) else '\n '.join(hbac_results['memberhost_host']) + groups = '' if not hbac_results.get('memberuser_group', None) else '\n '.join(hbac_results['memberuser_group']) + hostgroups = '' if not hbac_results.get('memberhost_hostgroup', None) else '\n '.join(hbac_results['memberhost_hostgroup']) + + @staticmethod + def rbac_pull(api, name, deep): + """ + Gets requested rbac info + """ + try: + rbac_results = IPAQuery.role_data(api, name) + except: + print(f'Could not find {name}', sys.stderr) + sys.exit(1) + + # pylint: disable=line-too-long + rbac_description = '' if not rbac_results.get('description', None) else rbac_results['description'][0] + rbac_privs = '' if not rbac_results.get('memberof_privilege', None) else '\n '.join(rbac_results['memberof_privilege']) + users = '' if not rbac_results.get('member_user', None) else '\n '.join(rbac_results['member_user']) + hosts = '' if not rbac_results.get('member_host', None) else '\n '.join(rbac_results['member_host']) + groups = '' if not rbac_results.get('member_group', None) else '\n '.join(rbac_results['member_group']) + hostgroups = '' if not rbac_results.get('member_hostgroup', None) else '\n '.join(rbac_results['member_hostgroup']) + + starter_rbac = { + 'Role Name:': name, + 'Description:': rbac_description, + 'Privileges:': rbac_privs, + 'Users:': users, + 'Groups:': groups, + 'Hosts:': hosts, + 'Hosts Groups:': hostgroups, + } + print('RBAC Information') + print('----------------------------------------') + for key, value in starter_rbac.items(): + if len(value) > 0: + print(f'{key: <16}{value}') + print('') + + if deep: + user_list = [] if not rbac_results.get('member_user', None) else rbac_results['member_user'] + group_list = [] if not rbac_results.get('member_group', None) else rbac_results['member_group'] + priv_list = [] if not rbac_results.get('memberof_privilege', None) else rbac_results['memberof_privilege'] + IPAAudit.role_deep_list(api, user_list, group_list, priv_list) + + @staticmethod + # pylint: disable=dangerous-default-value + def hbac_deep_list(api, users, groups, privs): + """ + Does recursive digging on the control provided + """ + print() + + @staticmethod + # pylint: disable=dangerous-default-value + def role_deep_list(api, users, groups, privs): + """ + Does recursive digging on the users provided + """ + starting_perms = [] + for priv in privs: + data = IPAQuery.priv_data(api, priv) + description = '' if not data.get('description', None) else data['description'][0] + permlist = '' if not data.get('memberof_permission', None) else data['memberof_permission'] + if len(permlist) > 0: + for perm in permlist: + if perm not in starting_perms: + starting_perms.append(perm) + + print(f'Permissions Applied to this Role') + print('----------------------------------------') + for item in starting_perms: + print(item) + print('') + + starting_user_list = users + for group in groups: + data = IPAQuery.group_data(api, group) + description = '' if not data.get('description', None) else data['description'][0] + userlist = '' if not data.get('member_user', None) else data['member_user'] + ind_userlist = '' if not data.get('memberindirect_user', None) else data['memberindirect_user'] + grouplist = '' if not data.get('member_group', None) else '\n '.join(data['member_group']) + + user_list = [] + ind_user_list = [] + if len(userlist) > 0: + for user in userlist: + user_list.append(f'{user}') + if user not in starting_user_list: + starting_user_list.append(user) + if len(ind_userlist) > 0: + for user in ind_userlist: + ind_user_list.append(f'{user}') + if user not in starting_user_list: + starting_user_list.append(user) + + user_list_join = '\n '.join(user_list) + ind_user_list_join = '\n '.join(ind_user_list) + + group_dict = { + 'Description:': description, + 'Users:': user_list_join, + 'Indirect Users:': ind_user_list_join, + 'Groups:': grouplist, + } + print(f'Group: {group}') + print('----------------------------------------') + for key, value in group_dict.items(): + if len(value) > 0: + print(f'{key: <16}{value}') + print('') + + final_user_list = {} + for user in starting_user_list: + data = IPAQuery.user_data(api, user) + this_user = IPAQuery.user_data(api, user) + fullname = f"{this_user['givenname'][0]} {this_user['sn'][0]}" + final_user_list[user] = fullname + + if len(starting_user_list) > 0: + print('Full List of Users for this role') + print('----------------------------------------') + for key, value in final_user_list.items(): + if len(value) > 0: + print(f'{key: <24}{value}') + + @staticmethod + def user_deep_list(api, user): + """ + Does a recursive dig on a user + """ + + @staticmethod + def group_deep_list(api, group): + """ + Does a recursive dig on a group + """ + +class IPAQuery: + """ + This is for getting query data + """ + @staticmethod + def user_data(api, user): + """ + Returns all user data + """ + return api.user_show(user)['result'] + + @staticmethod + def group_data(api, group): + """ + Returns all group data + """ + return api.group_show(group)['result'] + + @staticmethod + def hostgroup_data(api, group): + """ + Returns all group data + """ + return api.hostgroup_show(group)['result'] + + @staticmethod + def role_data(api, role): + """ + Returns all role data + """ + return api.role_show(role)['result'] + + @staticmethod + def priv_data(api, priv): + """ + Returns all role data + """ + return api.privilege_show(priv)['result'] + + @staticmethod + def hbac_data(api, hbac): + """ + Returns all hbac data + """ + return api.hbacrule_show(hbac)['result'] + + @staticmethod + def hbacsvcgroup_data(api, hbacsvcgroup): + """ + Returns all hbac service group data + """ + return api.hbacsvcgroup_show(hbacsvcgroup)['result'] + +# start main +def get_api(ipa_library='ipalib'): + """ + Gets and returns the right API entrypoint + """ + # This is unfortunately hacky. + if ipa_library == 'ipalib': + # When root, use the hostkeytab + if os.getuid() == 0: + IPACalls.quick_host_kinit() + api = libapi + try: + api.bootstrap(context="custom") + api.finalize() + api.Backend.rpcclient.connect() + command_api = api.Command + except: + print('WARNING: No kerberos credentials\n') + command_api = None + elif ipa_library == 'python_freeipa': + print() + else: + print('Unsupported ipa library', sys.stderr) + sys.exit(1) + + return command_api + +def main(): + """ + Main function entrypoint + """ + command_api = get_api() + if command == 'audit': + IPAAudit.entry(command_api, results.type, results.name, results.deep) + elif command == 'info': + IPAInfo.basic_ipa_info(command_api) + + # When root, kdestroy the host keytab + if os.getuid() == 0: + IPACalls.quick_host_kdestroy() + +if __name__ == '__main__': + main() diff --git a/mangle/ipa/ipainfo b/mangle/ipa/ipainfo new file mode 100755 index 0000000..3d85e7e --- /dev/null +++ b/mangle/ipa/ipainfo @@ -0,0 +1,26 @@ +#!/bin/bash +# Wrapper for ipaauditor.py info + +source /etc/os-release +case "$ID" in + rocky|centos|rhel) + case "${VERSION_ID:0:1}" in + 5|6|7) + echo "Not supported." + exit 3 + ;; + 8) + PYTHON_EXEC="/usr/libexec/platform-python" + ;; + *) + PYTHON_EXEC="/usr/bin/python3" + ;; + esac ;; + ubuntu|debian) + PYTHON_EXEC="/usr/bin/python3" + ;; + fedora) + PYTHON_EXEC="/usr/bin/python3" +esac + +$PYTHON_EXEC ipaauditor.py info "$@" diff --git a/mangle/ipa/ipainfo-legacy b/mangle/ipa/ipainfo-legacy new file mode 100644 index 0000000..c976257 --- /dev/null +++ b/mangle/ipa/ipainfo-legacy @@ -0,0 +1,14 @@ +#!/bin/bash +# Wrapper for ipaauditor.py info + +__classic_output=" +Note: You are running on an older system that does not support python3 with +FreeIPA, such as RHEL/CentOS 6 or 7. Information provided will be limited. + +Local host name: $(hostname -f) +Joined to domain: $(awk '/domain/ {print $3}' /etc/ipa/default.conf) +Joined as: $(awk '/host/ {print $3}' /etc/ipa/default.conf) +Registered DC: $(awk '/server/ {print $3}' /etc/ipa/default.conf) +" + +echo "$__classic_output" diff --git a/mangle/ipa/ipainfo.py b/mangle/ipa/ipainfo.py index 96810ac..5adc381 100644 --- a/mangle/ipa/ipainfo.py +++ b/mangle/ipa/ipainfo.py @@ -110,15 +110,19 @@ class IPAInfo: return results return ['Unknown'] -etc_ipa_default = EtcIPADefault.read() -domain_status = SssctlInfo.domain_status(etc_ipa_default['ipa_domain']) -current_dc = SssctlInfo.current_dc(etc_ipa_default['ipa_domain']) -current_hostname = etc_ipa_default['local_host_name'] -current_domain = etc_ipa_default['ipa_domain'] -hostgroups = '\n '.join(IPAInfo.get_host_groups(current_hostname)) + @staticmethod + def get_ipa_info(): + """ + Gets the actual info + """ -def main(): - output = f''' + etc_ipa_default = EtcIPADefault.read() + domain_status = SssctlInfo.domain_status(etc_ipa_default['ipa_domain']) + current_dc = SssctlInfo.current_dc(etc_ipa_default['ipa_domain']) + current_hostname = etc_ipa_default['local_host_name'] + current_domain = etc_ipa_default['ipa_domain'] + hostgroups = '\n '.join(IPAInfo.get_host_groups(current_hostname)) + output = f''' Local host name: {etc_ipa_default['local_host_name']} Joined to domain: {etc_ipa_default['ipa_domain']} Joined as: {etc_ipa_default['ipa_joined_name']} @@ -127,7 +131,11 @@ Current DC: {current_dc} Domain Status: {domain_status} Host Group(s): {hostgroups} ''' - print(output) + print(output) + + +def main(): + IPAInfo.get_ipa_info() if __name__ == '__main__': main() diff --git a/mangle/ipa/ipaquery b/mangle/ipa/ipaquery new file mode 100755 index 0000000..cc538c0 --- /dev/null +++ b/mangle/ipa/ipaquery @@ -0,0 +1,25 @@ +#!/bin/bash +# Wrapper for ipaauditor.py query + +case "$ID" in + rocky|centos|rhel) + case "${VERSION_ID:0:1}" in + 5|6|7) + echo "Not supported." + exit 3 + ;; + 8) + PYTHON_EXEC="/usr/libexec/platform-python" + ;; + *) + PYTHON_EXEC="/usr/bin/python3" + ;; + esac ;; + ubuntu|debian) + PYTHON_EXEC="/usr/bin/python3" + ;; + fedora) + PYTHON_EXEC="/usr/bin/python3" +esac + +$PYTHON_EXEC ipaquery.py query "$@"