From 870da403e79ab50350803b45f200e0b272334989 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Fri, 27 May 2022 05:48:29 +1000 Subject: Rework user management (#1220) * Rework users * Update user installation * Fix config serialization * Update * Update schemas and documentation * Update * Fix flake8 * Make users mypy compatible * Fix minor copy Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum --- archinstall/__init__.py | 11 +- archinstall/lib/configuration.py | 2 +- archinstall/lib/installer.py | 16 +- archinstall/lib/menu/global_menu.py | 53 +++--- archinstall/lib/menu/list_manager.py | 37 ++--- archinstall/lib/models/users.py | 77 +++++++++ archinstall/lib/output.py | 38 ++++- archinstall/lib/user_interaction/__init__.py | 2 +- .../lib/user_interaction/manage_users_conf.py | 177 +++++++-------------- archinstall/lib/user_interaction/network_conf.py | 2 + .../lib/user_interaction/subvolume_config.py | 2 + 11 files changed, 236 insertions(+), 181 deletions(-) create mode 100644 archinstall/lib/models/users.py (limited to 'archinstall') diff --git a/archinstall/__init__.py b/archinstall/__init__.py index fcb741e6..1e72a60e 100644 --- a/archinstall/__init__.py +++ b/archinstall/__init__.py @@ -1,7 +1,4 @@ """Arch Linux installer - guided, templates etc.""" -import urllib.error -import urllib.parse -import urllib.request from argparse import ArgumentParser from .lib.disk import * @@ -13,6 +10,7 @@ from .lib.locale_helpers import * from .lib.luks import * from .lib.mirrors import * from .lib.models.network_configuration import NetworkConfigurationHandler +from .lib.models.users import User from .lib.networking import * from .lib.output import * from .lib.models.dataclasses import ( @@ -160,7 +158,7 @@ def get_arguments() -> Dict[str, Any]: if args.creds is not None: if not json_stream_to_structure('--creds', args.creds, config): exit(1) - + # load the parameters. first the known, then the unknowns config.update(vars(args)) config.update(parse_unspecified_argument_list(unknowns)) @@ -211,6 +209,11 @@ def load_config(): handler = NetworkConfigurationHandler() handler.parse_arguments(arguments.get('nic')) arguments['nic'] = handler.configuration + if arguments.get('!users', None) is not None or arguments.get('!superusers', None) is not None: + users = arguments.get('!users', None) + superusers = arguments.get('!superusers', None) + arguments['!users'] = User.parse_arguments(users, superusers) + def post_process_arguments(arguments): storage['arguments'] = arguments diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index f3fe1e1c..510f7103 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -37,7 +37,7 @@ class ConfigurationOutput: self._user_creds_file = "user_credentials.json" self._disk_layout_file = "user_disk_layout.json" - self._sensitive = ['!users', '!superusers', '!encryption-password'] + self._sensitive = ['!users', '!encryption-password'] self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run'] self._process_config() diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 5cedc9d6..903d33af 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -23,6 +23,7 @@ from .profiles import Profile from .disk.partition import get_mount_fs_type from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .hsm import fido2_enroll +from .models.users import User if TYPE_CHECKING: _: Any @@ -251,7 +252,7 @@ class Installer: loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): @@ -426,7 +427,7 @@ class Installer: if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): return self.pacstrap(*packages, **kwargs) - + raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: @@ -1062,7 +1063,7 @@ class Installer: self.log(f'Installing archinstall profile {profile}', level=logging.INFO) return profile.install() - def enable_sudo(self, entity: str, group :bool = False) -> bool: + def enable_sudo(self, entity: str, group :bool = False): self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) sudoers_dir = f"{self.target}/etc/sudoers.d" @@ -1092,9 +1093,14 @@ class Installer: # Guarantees sudoer conf file recommended perms os.chmod(pathlib.Path(rule_file_name), 0o440) - return True + def create_users(self, users: Union[User, List[User]]): + if not isinstance(users, list): + users = [users] + + for user in users: + self.user_create(user.username, user.password, user.groups, user.sudo) - def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None: + def user_create(self, user :str, password :Optional[str] = None, groups :Optional[List[str]] = None, sudo :bool = False) -> None: if groups is None: groups = [] diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py index 53e0941d..5cb27cab 100644 --- a/archinstall/lib/menu/global_menu.py +++ b/archinstall/lib/menu/global_menu.py @@ -21,7 +21,6 @@ from ..user_interaction import ask_hostname from ..user_interaction import ask_for_audio_selection from ..user_interaction import ask_additional_packages_to_install from ..user_interaction import ask_to_configure_network -from ..user_interaction import ask_for_superuser_account from ..user_interaction import ask_for_additional_users from ..user_interaction import select_language from ..user_interaction import select_mirror_regions @@ -33,7 +32,9 @@ from ..user_interaction import select_encrypted_partitions from ..user_interaction import select_harddrives from ..user_interaction import select_profile from ..user_interaction import select_additional_repositories +from ..models.users import User from ..user_interaction.partitioning_conf import current_partition_layout +from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -122,21 +123,13 @@ class GlobalMenu(GeneralMenu): _('Root password'), lambda preset:self._set_root_password(), display_func=lambda x: secret(x) if x else 'None') - self._menu_options['!superusers'] = \ - Selector( - _('Superuser account'), - lambda preset: self._create_superuser_account(), - default={}, - exec_func=lambda n,v:self._users_resynch(), - dependencies_not=['!root-password'], - display_func=lambda x: self._display_superusers()) self._menu_options['!users'] = \ Selector( _('User account'), - lambda x: self._create_user_account(), + lambda x: self._create_user_account(x), default={}, - exec_func=lambda n,v:self._users_resynch(), - display_func=lambda x: list(x.keys()) if x else '[]') + display_func=lambda x: f'{len(x)} {_("User(s)")}' if len(x) > 0 else None, + preview_func=self._prev_users) self._menu_options['profile'] = \ Selector( _('Profile'), @@ -273,17 +266,28 @@ class GlobalMenu(GeneralMenu): return text[:-1] # remove last new line return None + def _prev_users(self) -> Optional[str]: + selector = self._menu_options['!users'] + if selector.has_selection(): + users: List[User] = selector.current_selection + return FormattedOutput.as_table(users) + return None + def _missing_configs(self) -> List[str]: def check(s): return self._menu_options.get(s).has_selection() + def has_superuser() -> bool: + users = self._menu_options['!users'].current_selection + return any([u.sudo for u in users]) + missing = [] if not check('bootloader'): missing += ['Bootloader'] if not check('hostname'): missing += ['Hostname'] - if not check('!root-password') and not check('!superusers'): - missing += [str(_('Either root-password or at least 1 superuser must be specified'))] + if not check('!root-password') and not has_superuser(): + missing += [str(_('Either root-password or at least 1 user with sudo privileges must be specified'))] if not check('harddrives'): missing += ['Hard drives'] if check('harddrives'): @@ -380,23 +384,6 @@ class GlobalMenu(GeneralMenu): return ret - def _create_superuser_account(self) -> Optional[Dict[str, Dict[str, str]]]: - superusers = ask_for_superuser_account(str(_('Manage superuser accounts: '))) - return superusers if superusers else None - - def _create_user_account(self) -> Dict[str, Dict[str, str | None]]: - users = ask_for_additional_users(str(_('Manage ordinary user accounts: '))) + def _create_user_account(self, defined_users: List[User]) -> List[User]: + users = ask_for_additional_users(defined_users=defined_users) return users - - def _display_superusers(self): - superusers = self._data_store.get('!superusers', {}) - - if self._menu_options.get('!root-password').has_selection(): - return list(superusers.keys()) if superusers else '[]' - else: - return list(superusers.keys()) if superusers else '' - - def _users_resynch(self) -> bool: - self.synch('!superusers') - self.synch('!users') - return False diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py index 7db3b3a9..cb567093 100644 --- a/archinstall/lib/menu/list_manager.py +++ b/archinstall/lib/menu/list_manager.py @@ -84,13 +84,13 @@ The contents in the base class of this methods serve for a very basic usage, and ``` """ - -from .text_input import TextInput -from .menu import Menu, MenuSelectionType +import copy from os import system -from copy import copy from typing import Union, Any, TYPE_CHECKING, Dict, Optional +from .text_input import TextInput +from .menu import Menu + if TYPE_CHECKING: _: Any @@ -144,14 +144,14 @@ class ListManager: self.bottom_list = [self.confirm_action,self.cancel_action] self.bottom_item = [self.cancel_action] self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))] - self.base_data = base_list - self._data = copy(base_list) # as refs, changes are immediate + self._original_data = copy.deepcopy(base_list) + self._data = copy.deepcopy(base_list) # as refs, changes are immediate # default values for the null case self.target: Optional[Any] = None self.action = self._null_action if len(self._data) == 0 and self._null_action: - self.exec_action(self._data) + self._data = self.exec_action(self._data) def run(self): while True: @@ -175,12 +175,10 @@ class ListManager: clear_screen=False, clear_menu_on_exit=False, header=self.header, - skip_empty_entries=True + skip_empty_entries=True, + skip=False ).run() - if target.type_ == MenuSelectionType.Esc: - return self.run() - if not target.value or target.value in self.bottom_list: self.action = target break @@ -188,21 +186,23 @@ class ListManager: if target.value and target.value in self._default_action: self.action = target.value self.target = None - self.exec_action(self._data) + self._data = self.exec_action(self._data) continue if isinstance(self._data,dict): data_key = data_formatted[target.value] key = self._data[data_key] self.target = {data_key: key} + elif isinstance(self._data, list): + self.target = [d for d in self._data if d == data_formatted[target.value]][0] else: self.target = self._data[data_formatted[target.value]] # Possible enhacement. If run_actions returns false a message line indicating the failure self.run_actions(target.value) - if not target or target == self.cancel_action: # TODO dubious - return self.base_data # return the original list + if target.value == self.cancel_action: # TODO dubious + return self._original_data # return the original list else: return self._data @@ -221,10 +221,9 @@ class ListManager: self.action = choice.value - if not self.action or self.action == self.cancel_action: - return False - else: - return self.exec_action(self._data) + if self.action and self.action != self.cancel_action: + self._data = self.exec_action(self._data) + """ The following methods are expected to be overwritten by the user if the needs of the list are beyond the simple case """ @@ -293,3 +292,5 @@ class ListManager: self._data[origkey] = value elif self.action == str(_('Delete')): del self._data[origkey] + + return self._data diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py new file mode 100644 index 00000000..6052b73a --- /dev/null +++ b/archinstall/lib/models/users.py @@ -0,0 +1,77 @@ +from dataclasses import dataclass +from typing import Dict, List, Union, Any, TYPE_CHECKING + +if TYPE_CHECKING: + _: Any + + +@dataclass +class User: + username: str + password: str + sudo: bool + + @property + def groups(self) -> List[str]: + # this property should be transferred into a class attr instead + # if it's every going to be used + return [] + + def json(self) -> Dict[str, Any]: + return { + 'username': self.username, + '!password': self.password, + 'sudo': self.sudo + } + + def display(self) -> str: + password = '*' * len(self.password) + return f'{_("Username")}: {self.username:16} {_("Password")}: {password:16} sudo: {str(self.sudo)}' + + @classmethod + def _parse(cls, config_users: List[Dict[str, Any]]) -> List['User']: + users = [] + + for entry in config_users: + username = entry.get('username', None) + password = entry.get('!password', '') + sudo = entry.get('sudo', False) + + if username is None: + continue + + user = User(username, password, sudo) + users.append(user) + + return users + + @classmethod + def _parse_backwards_compatible(cls, config_users: Dict, sudo: bool) -> List['User']: + if len(config_users.keys()) > 0: + username = list(config_users.keys())[0] + password = config_users[username]['!password'] + + if password: + return [User(username, password, sudo)] + + return [] + + @classmethod + def parse_arguments( + cls, + config_users: Union[List[Dict[str, str]], Dict[str, str]], + config_superusers: Union[List[Dict[str, str]], Dict[str, str]] + ) -> List['User']: + users = [] + + # backwards compability + if isinstance(config_users, dict): + users += cls._parse_backwards_compatible(config_users, False) + else: + users += cls._parse(config_users) + + # backwards compability + if isinstance(config_superusers, dict): + users += cls._parse_backwards_compatible(config_superusers, True) + + return users diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index 07747091..29b73bc4 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -2,11 +2,47 @@ import logging import os import sys from pathlib import Path -from typing import Dict, Union +from typing import Dict, Union, List, Any from .storage import storage +class FormattedOutput: + + @classmethod + def values(cls, o: Any) -> Dict[str, Any]: + if hasattr(o, 'json'): + return o.json() + else: + return o.__dict__ + + @classmethod + def as_table(cls, obj: List[Any]) -> str: + column_width: Dict[str, int] = {} + for o in obj: + for k, v in cls.values(o).items(): + column_width.setdefault(k, 0) + column_width[k] = max([column_width[k], len(str(v)), len(k)]) + + output = '' + for key, width in column_width.items(): + key = key.replace('!', '') + output += key.ljust(width) + ' | ' + + output = output[:-3] + '\n' + output += '-' * len(output) + '\n' + + for o in obj: + for k, v in cls.values(o).items(): + if '!' in k: + v = '*' * len(str(v)) + output += str(v).ljust(column_width[k]) + ' | ' + output = output[:-3] + output += '\n' + + return output + + class Journald: @staticmethod def log(message :str, level :int = logging.DEBUG) -> None: diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py index b0174d94..8aba4b4d 100644 --- a/archinstall/lib/user_interaction/__init__.py +++ b/archinstall/lib/user_interaction/__init__.py @@ -1,5 +1,5 @@ from .save_conf import save_config -from .manage_users_conf import ask_for_superuser_account, ask_for_additional_users +from .manage_users_conf import ask_for_additional_users from .backwards_compatible_conf import generic_select, generic_multi_select from .locale_conf import select_locale_lang, select_locale_enc from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py index ea909f35..567a2964 100644 --- a/archinstall/lib/user_interaction/manage_users_conf.py +++ b/archinstall/lib/user_interaction/manage_users_conf.py @@ -1,14 +1,12 @@ from __future__ import annotations -import logging import re -from typing import Any, Dict, TYPE_CHECKING, List +from typing import Any, Dict, TYPE_CHECKING, List, Optional +from .utils import get_password from ..menu import Menu from ..menu.list_manager import ListManager -from ..output import log -from ..storage import storage -from .utils import get_password +from ..models.users import User if TYPE_CHECKING: _: Any @@ -19,7 +17,7 @@ class UserList(ListManager): subclass of ListManager for the managing of user accounts """ - def __init__(self, prompt: str, lusers: dict, sudo: bool = None): + def __init__(self, prompt: str, lusers: List[User]): """ param: prompt type: str @@ -27,140 +25,83 @@ class UserList(ListManager): type: Dict param: sudo. boolean to determine if we handle superusers or users. If None handles both types """ - self.sudo = sudo - self.actions = [ + self._actions = [ str(_('Add a user')), str(_('Change password')), str(_('Promote/Demote user')), str(_('Delete User')) ] - super().__init__(prompt, lusers, self.actions, self.actions[0]) - - def reformat(self, data: List) -> Dict: - def format_element(elem :str): - # secret gives away the length of the password - if data[elem].get('!password'): - pwd = '*' * 16 - else: - pwd = '' - if data[elem].get('sudoer'): - super_user = 'Superuser' - else: - super_user = ' ' - return f"{elem:16}: password {pwd:16} {super_user}" + super().__init__(prompt, lusers, self._actions, self._actions[0]) - return {format_element(e): e for e in data} + def reformat(self, data: List[User]) -> Dict[str, User]: + return {e.display(): e for e in data} def action_list(self): - if self.target: - active_user = list(self.target.keys())[0] - else: - active_user = None - sudoer = self.target[active_user].get('sudoer', False) - if self.sudo is None: - return self.actions - if self.sudo and sudoer: - return self.actions - elif self.sudo and not sudoer: - return [self.actions[2]] - elif not self.sudo and sudoer: - return [self.actions[2]] + active_user = self.target if self.target else None + + if active_user is None: + return [self._actions[0]] else: - return self.actions + return self._actions[1:] - def exec_action(self, data: Any): + def exec_action(self, data: List[User]) -> List[User]: if self.target: - active_user = list(self.target.keys())[0] + active_user = self.target else: active_user = None - if self.action == self.actions[0]: # add - new_user = self.add_user() - # no unicity check, if exists will be replaced - data.update(new_user) - elif self.action == self.actions[1]: # change password - data[active_user]['!password'] = get_password( - prompt=str(_('Password for user "{}": ').format(active_user))) - elif self.action == self.actions[2]: # promote/demote - data[active_user]['sudoer'] = not data[active_user]['sudoer'] - elif self.action == self.actions[3]: # delete - del data[active_user] + if self.action == self._actions[0]: # add + new_user = self._add_user() + if new_user is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.username != new_user.username] + data += [new_user] + elif self.action == self._actions[1]: # change password + prompt = str(_('Password for user "{}": ').format(active_user.username)) + new_password = get_password(prompt=prompt) + if new_password: + user = next(filter(lambda x: x == active_user, data), 1) + user.password = new_password + elif self.action == self._actions[2]: # promote/demote + user = next(filter(lambda x: x == active_user, data), 1) + user.sudo = False if user.sudo else True + elif self.action == self._actions[3]: # delete + data = [d for d in data if d != active_user] + + return data def _check_for_correct_username(self, username: str) -> bool: if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: return True - log("The username you entered is invalid. Try again", level=logging.WARNING, fg='red') return False - def add_user(self): + def _add_user(self) -> Optional[User]: print(_('\nDefine a new user\n')) - prompt = str(_("User Name : ")) + prompt = str(_('Enter username (leave blank to skip): ')) + while True: - userid = input(prompt).strip(' ') - if not userid: - return {} # end - if not self._check_for_correct_username(userid): - pass + username = input(prompt).strip(' ') + if not username: + return None + if not self._check_for_correct_username(username): + prompt = str(_("The username you entered is invalid. Try again")) + '\n' + prompt else: break - if self.sudo: - sudoer = True - elif self.sudo is not None and not self.sudo: - sudoer = False - else: - sudoer = False - sudo_choice = Menu(str(_('Should {} be a superuser (sudoer)?')).format(userid), Menu.yes_no(), - skip=False, - preset_values=Menu.yes() if sudoer else Menu.no(), - default_option=Menu.no()).run() - sudoer = True if sudo_choice == Menu.yes() else False - - password = get_password(prompt=str(_('Password for user "{}": ').format(userid))) - - return {userid: {"!password": password, "sudoer": sudoer}} - - -def manage_users(prompt: str, sudo: bool) -> tuple[dict, dict]: - # TODO Filtering and some kind of simpler code - lusers = {} - if storage['arguments'].get('!superusers', {}): - lusers.update({ - uid: { - '!password': storage['arguments']['!superusers'][uid].get('!password'), - 'sudoer': True - } - for uid in storage['arguments'].get('!superusers', {}) - }) - if storage['arguments'].get('!users', {}): - lusers.update({ - uid: { - '!password': storage['arguments']['!users'][uid].get('!password'), - 'sudoer': False - } - for uid in storage['arguments'].get('!users', {}) - }) - # processing - lusers = UserList(prompt, lusers, sudo).run() - # return data - superusers = { - uid: { - '!password': lusers[uid].get('!password') - } - for uid in lusers if lusers[uid].get('sudoer', False) - } - users = {uid: {'!password': lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer', False)} - storage['arguments']['!superusers'] = superusers - storage['arguments']['!users'] = users - return superusers, users - - -def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]: - prompt = prompt if prompt else str(_('Define users with sudo privilege, by username: ')) - superusers, dummy = manage_users(prompt, sudo=True) - return superusers - - -def ask_for_additional_users(prompt: str = '') -> Dict[str, Dict[str, str | None]]: - prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ') - dummy, users = manage_users(prompt, sudo=False) + + password = get_password(prompt=str(_('Password for user "{}": ').format(username))) + + choice = Menu( + str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(), + skip=False, + default_option=Menu.no() + ).run() + + sudo = True if choice.value == Menu.yes() else False + return User(username, password, sudo) + + +def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]: + prompt = prompt if prompt else _('Enter username (leave blank to skip): ') + users = UserList(prompt, defined_users).run() return users diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py index 25e9d4c6..5154d8b1 100644 --- a/archinstall/lib/user_interaction/network_conf.py +++ b/archinstall/lib/user_interaction/network_conf.py @@ -64,6 +64,8 @@ class ManualNetworkConfig(ListManager): elif self.action == self._action_delete: del data[iface_name] + return data + def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]: all_ifaces = list_interfaces().values() available = set(all_ifaces) - set(existing_ifaces) diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py index 7383b13d..af783639 100644 --- a/archinstall/lib/user_interaction/subvolume_config.py +++ b/archinstall/lib/user_interaction/subvolume_config.py @@ -57,6 +57,8 @@ class SubvolumeList(ListManager): data.update(self.target) + return data + class SubvolumeMenu(GeneralMenu): def __init__(self,parameters,action=None): -- cgit v1.2.3-54-g00ecf