From 3dc0d957e838c34b48a0782d2540341e33b24070 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 28 Mar 2022 22:49:05 +1100 Subject: Deflate user interactions (#1019) * Deflate the user interactions file * Fix flake8 Co-authored-by: Daniel Girtler --- archinstall/lib/user_interaction/__init__.py | 12 + .../user_interaction/backwards_compatible_conf.py | 95 +++++++ archinstall/lib/user_interaction/disk_conf.py | 81 ++++++ archinstall/lib/user_interaction/general_conf.py | 184 +++++++++++++ archinstall/lib/user_interaction/locale_conf.py | 35 +++ .../lib/user_interaction/manage_users_conf.py | 169 ++++++++++++ archinstall/lib/user_interaction/network_conf.py | 111 ++++++++ .../lib/user_interaction/partitioning_conf.py | 295 +++++++++++++++++++++ archinstall/lib/user_interaction/save_conf.py | 74 ++++++ archinstall/lib/user_interaction/system_conf.py | 144 ++++++++++ archinstall/lib/user_interaction/utils.py | 99 +++++++ 11 files changed, 1299 insertions(+) create mode 100644 archinstall/lib/user_interaction/__init__.py create mode 100644 archinstall/lib/user_interaction/backwards_compatible_conf.py create mode 100644 archinstall/lib/user_interaction/disk_conf.py create mode 100644 archinstall/lib/user_interaction/general_conf.py create mode 100644 archinstall/lib/user_interaction/locale_conf.py create mode 100644 archinstall/lib/user_interaction/manage_users_conf.py create mode 100644 archinstall/lib/user_interaction/network_conf.py create mode 100644 archinstall/lib/user_interaction/partitioning_conf.py create mode 100644 archinstall/lib/user_interaction/save_conf.py create mode 100644 archinstall/lib/user_interaction/system_conf.py create mode 100644 archinstall/lib/user_interaction/utils.py (limited to 'archinstall/lib/user_interaction') diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py new file mode 100644 index 00000000..b0174d94 --- /dev/null +++ b/archinstall/lib/user_interaction/__init__.py @@ -0,0 +1,12 @@ +from .save_conf import save_config +from .manage_users_conf import ask_for_superuser_account, ask_for_additional_users +from .backwards_compatible_conf import generic_select, generic_multi_select +from .locale_conf import select_locale_lang, select_locale_enc +from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap +from .network_conf import ask_to_configure_network +from .partitioning_conf import select_partition, select_encrypted_partitions +from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions, + select_profile, select_archinstall_language, ask_additional_packages_to_install, + select_additional_repositories, ask_hostname) +from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk +from .utils import get_password, do_countdown diff --git a/archinstall/lib/user_interaction/backwards_compatible_conf.py b/archinstall/lib/user_interaction/backwards_compatible_conf.py new file mode 100644 index 00000000..d91690eb --- /dev/null +++ b/archinstall/lib/user_interaction/backwards_compatible_conf.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import logging +import sys +from collections.abc import Iterable +from typing import Any, Union, TYPE_CHECKING + +from ..exceptions import RequirementError +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def generic_select( + p_options: Union[list, dict], + input_text: str = '', + allow_empty_input: bool = True, + options_output: bool = True, # function not available + sort: bool = False, + multi: bool = False, + default: Any = None) -> Any: + """ + A generic select function that does not output anything + other than the options and their indexes. As an example: + + generic_select(["first", "second", "third option"]) + > first + second + third option + When the user has entered the option correctly, + this function returns an item from list, a string, or None + + Options can be any iterable. + Duplicate entries are not checked, but the results with them are unreliable. Which element to choose from the duplicates depends on the return of the index() + Default value if not on the list of options will be added as the first element + sort will be handled by Menu() + """ + # We check that the options are iterable. If not we abort. Else we copy them to lists + # it options is a dictionary we use the values as entries of the list + # if options is a string object, each character becomes an entry + # if options is a list, we implictily build a copy to mantain immutability + if not isinstance(p_options, Iterable): + log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select", fg="red") + log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>", level=logging.WARNING) + raise RequirementError("generic_select() requires an iterable as option.") + + input_text = input_text if input_text else _('Select one of the values shown below: ') + + if isinstance(p_options, dict): + options = list(p_options.values()) + else: + options = list(p_options) + # check that the default value is in the list. If not it will become the first entry + if default and default not in options: + options.insert(0, default) + + # one of the drawbacks of the new interface is that in only allows string like options, so we do a conversion + # also for the default value if it exists + soptions = list(map(str, options)) + default_value = options[options.index(default)] if default else None + + selected_option = Menu(input_text, + soptions, + skip=allow_empty_input, + multi=multi, + default_option=default_value, + sort=sort).run() + # we return the original objects, not the strings. + # options is the list with the original objects and soptions the list with the string values + # thru the map, we get from the value selected in soptions it index, and thu it the original object + if not selected_option: + return selected_option + elif isinstance(selected_option, list): # for multi True + selected_option = list(map(lambda x: options[soptions.index(x)], selected_option)) + else: # for multi False + selected_option = options[soptions.index(selected_option)] + return selected_option + + +def generic_multi_select(p_options: Union[list, dict], + text: str = '', + sort: bool = False, + default: Any = None, + allow_empty: bool = False) -> Any: + + text = text if text else _("Select one or more of the options below: ") + + return generic_select(p_options, + input_text=text, + allow_empty_input=allow_empty, + sort=sort, + multi=True, + default=default) diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py new file mode 100644 index 00000000..9238a766 --- /dev/null +++ b/archinstall/lib/user_interaction/disk_conf.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from typing import Any, Dict, TYPE_CHECKING + +from .partitioning_conf import manage_new_and_existing_partitions, get_default_partition_layout +from ..disk import BlockDevice +from ..exceptions import DiskError +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def ask_for_main_filesystem_format(advanced_options=False): + options = {'btrfs': 'btrfs', 'ext4': 'ext4', 'xfs': 'xfs', 'f2fs': 'f2fs'} + + advanced = {'ntfs': 'ntfs'} + + if advanced_options: + options.update(advanced) + + prompt = _('Select which filesystem your main partition should use') + choice = Menu(prompt, options, skip=False).run() + return choice + + +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: + result = {} + + for device in block_devices: + layout = manage_new_and_existing_partitions(device) + + result[device.path] = layout + + return result + + +def select_disk_layout(block_devices: list, advanced_options=False) -> Dict[str, Any]: + wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout')) + custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)')) + modes = [wipe_mode, custome_mode] + + print(modes) + mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run() + + if mode == wipe_mode: + return get_default_partition_layout(block_devices, advanced_options) + else: + return select_individual_blockdevice_usage(block_devices) + + +def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice: + """ + Asks the user to select a harddrive from the `dict_o_disks` selection. + Usually this is combined with :ref:`archinstall.list_drives`. + + :param dict_o_disks: A `dict` where keys are the drive-name, value should be a dict containing drive information. + :type dict_o_disks: dict + + :return: The name/path (the dictionary key) of the selected drive + :rtype: str + """ + drives = sorted(list(dict_o_disks.keys())) + if len(drives) >= 1: + for index, drive in enumerate(drives): + print( + f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})" + ) + + log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", + fg="yellow") + + drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run() + if not drive: + return drive + + drive = dict_o_disks[drive] + return drive + + raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py new file mode 100644 index 00000000..c42e9e27 --- /dev/null +++ b/archinstall/lib/user_interaction/general_conf.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import logging +from typing import List, Any, Optional, Dict, TYPE_CHECKING + +from ..menu.text_input import TextInput + +from ..locale_helpers import list_keyboard_languages, list_timezones +from ..menu import Menu +from ..output import log +from ..profiles import Profile, list_profiles +from ..mirrors import list_mirrors + +from ..translation import Translation +from ..packages.packages import validate_package_list + +if TYPE_CHECKING: + _: Any + + +def ask_ntp(preset: bool = True) -> bool: + prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n')) + prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki')) + if preset: + preset_val = 'yes' + else: + preset_val = 'no' + choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run() + return False if choice == 'no' else True + + +def ask_hostname(preset: str = None) -> str: + hostname = TextInput(_('Desired hostname for the installation: '), preset).run().strip(' ') + return hostname + + +def ask_for_a_timezone(preset: str = None) -> str: + timezones = list_timezones() + default = 'UTC' + + selected_tz = Menu(_('Select a timezone'), + list(timezones), + skip=False, + preset_values=preset, + default_option=default).run() + + return selected_tz + + +def ask_for_audio_selection(desktop: bool = True, preset: str = None) -> str: + audio = 'pipewire' if desktop else 'none' + choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] + selected_audio = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=audio, skip=False).run() + return selected_audio + + +def select_language(default_value: str, preset_value: str = None) -> str: + """ + Asks the user to select a language + Usually this is combined with :ref:`archinstall.list_keyboard_languages`. + + :return: The language/dictionary key of the selected language + :rtype: str + """ + kb_lang = list_keyboard_languages() + # sort alphabetically and then by length + # it's fine if the list is big because the Menu + # allows for searching anyways + sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) + + selected_lang = Menu(_('Select Keyboard layout'), + sorted_kb_lang, + default_option=default_value, + preset_values=preset_value, + sort=False).run() + return selected_lang + + +def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: + """ + Asks the user to select a mirror or region + Usually this is combined with :ref:`archinstall.list_mirrors`. + + :return: The dictionary information about a mirror/region. + :rtype: dict + """ + if preset_values is None: + preselected = None + else: + preselected = list(preset_values.keys()) + mirrors = list_mirrors() + selected_mirror = Menu(_('Select one of the regions to download packages from'), + list(mirrors.keys()), + preset_values=preselected, + multi=True).run() + + if selected_mirror is not None: + return {selected: mirrors[selected] for selected in selected_mirror} + + return {} + + +def select_archinstall_language(default='English'): + languages = Translation.get_all_names() + language = Menu(_('Select Archinstall language'), languages, default_option=default).run() + return language + + +def select_profile() -> Optional[Profile]: + """ + # Asks the user to select a profile from the available profiles. + # + # :return: The name/dictionary key of the selected profile + # :rtype: str + # """ + top_level_profiles = sorted(list(list_profiles(filter_top_level_profiles=True))) + options = {} + + for profile in top_level_profiles: + profile = Profile(None, profile) + description = profile.get_profile_description() + + option = f'{profile.profile}: {description}' + options[option] = profile + + title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments') + + selection = Menu(title=title, p_options=list(options.keys())).run() + + if selection is not None: + return options[selection] + + return None + + +def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]: + # Additional packages (with some light weight error handling for invalid package names) + print(_('Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.')) + print(_('If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.')) + + def read_packages(already_defined: list = []) -> list: + display = ' '.join(already_defined) + input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run() + return input_packages.split(' ') if input_packages else [] + + pre_set_packages = pre_set_packages if pre_set_packages else [] + packages = read_packages(pre_set_packages) + + while True: + if len(packages): + # Verify packages that were given + print(_("Verifying that additional packages exist (this might take a few seconds)")) + valid, invalid = validate_package_list(packages) + + if invalid: + log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') + packages = read_packages(valid) + continue + break + + return packages + + +def select_additional_repositories(preset: List[str]) -> List[str]: + """ + Allows the user to select additional repositories (multilib, and testing) if desired. + + :return: The string as a selected repository + :rtype: string + """ + + repositories = ["multilib", "testing"] + + additional_repositories = Menu(_('Choose which optional additional repositories to enable'), + repositories, + sort=False, + multi=True, + preset_values=preset, + default_option=[]).run() + + if additional_repositories is not None: + return additional_repositories + + return [] diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/user_interaction/locale_conf.py new file mode 100644 index 00000000..d48018cf --- /dev/null +++ b/archinstall/lib/user_interaction/locale_conf.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any, TYPE_CHECKING + +from ..locale_helpers import list_locales +from ..menu import Menu + +if TYPE_CHECKING: + _: Any + + +def select_locale_lang(default: str, preset: str = None) -> str: + locales = list_locales() + locale_lang = set([locale.split()[0] for locale in locales]) + + selected_locale = Menu(_('Choose which locale language to use'), + locale_lang, + sort=True, + preset_values=preset, + default_option=default).run() + + return selected_locale + + +def select_locale_enc(default: str, preset: str = None) -> str: + locales = list_locales() + locale_enc = set([locale.split()[1] for locale in locales]) + + selected_locale = Menu(_('Choose which locale encoding to use'), + locale_enc, + sort=True, + preset_values=preset, + default_option=default).run() + + return selected_locale diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py new file mode 100644 index 00000000..0af0d776 --- /dev/null +++ b/archinstall/lib/user_interaction/manage_users_conf.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import logging +import re +from typing import Any, Dict, TYPE_CHECKING + +from ..menu import Menu +from ..menu.list_manager import ListManager +from ..output import log +from ..storage import storage +from .utils import get_password + +if TYPE_CHECKING: + _: Any + + +class UserList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + + def __init__(self, prompt: str, lusers: dict, sudo: bool = None): + """ + param: prompt + type: str + param: lusers dict with the users already defined for the system + type: Dict + param: sudo. boolean to determine if we handle superusers or users. If None handles both types + """ + self.sudo = sudo + self.actions = [ + str(_('Add an user')), + str(_('Change password')), + str(_('Promote/Demote user')), + str(_('Delete User')) + ] + self.default_action = self.actions[0] + super().__init__(prompt, lusers, self.actions, self.default_action) + + def reformat(self): + + def format_element(elem): + # secret gives away the length of the password + if self.data[elem].get('!password'): + pwd = '*' * 16 + # pwd = archinstall.secret(self.data[elem]['!password']) + else: + pwd = '' + if self.data[elem].get('sudoer'): + super = 'Superuser' + else: + super = ' ' + return f"{elem:16}: password {pwd:16} {super}" + + return list(map(lambda x: format_element(x), self.data)) + + def action_list(self): + if self.target: + active_user = list(self.target.keys())[0] + else: + active_user = None + sudoer = self.target[active_user].get('sudoer', False) + if self.sudo is None: + return self.actions + if self.sudo and sudoer: + return self.actions + elif self.sudo and not sudoer: + return [self.actions[2]] + elif not self.sudo and sudoer: + return [self.actions[2]] + else: + return self.actions + + def exec_action(self): + if self.target: + active_user = list(self.target.keys())[0] + else: + active_user = None + + if self.action == self.actions[0]: # add + new_user = self.add_user() + # no unicity check, if exists will be replaced + self.data.update(new_user) + elif self.action == self.actions[1]: # change password + self.data[active_user]['!password'] = get_password( + prompt=str(_('Password for user "{}": ').format(active_user))) + elif self.action == self.actions[2]: # promote/demote + self.data[active_user]['sudoer'] = not self.data[active_user]['sudoer'] + elif self.action == self.actions[3]: # delete + del self.data[active_user] + + def _check_for_correct_username(username: str) -> bool: + if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: + return True + log("The username you entered is invalid. Try again", level=logging.WARNING, fg='red') + return False + + def add_user(self): + print(_('\nDefine a new user\n')) + prompt = str(_("User Name : ")) + while True: + userid = input(prompt).strip(' ') + if not userid: + return {} # end + if not self._check_for_correct_username(userid): + pass + else: + break + if self.sudo: + sudoer = True + elif self.sudo is not None and not self.sudo: + sudoer = False + else: + sudoer = False + sudo_choice = Menu(str(_('Should {} be a superuser (sudoer)?')).format(userid), ['yes', 'no'], + skip=False, + preset_values='yes' if sudoer else 'no', + default_option='no').run() + sudoer = True if sudo_choice == 'yes' else False + + password = get_password(prompt=str(_('Password for user "{}": ').format(userid))) + + return {userid: {"!password": password, "sudoer": sudoer}} + + +def manage_users(prompt: str, sudo: bool) -> tuple[dict, dict]: + # TODO Filtering and some kind of simpler code + lusers = {} + if storage['arguments'].get('!superusers', {}): + lusers.update({ + uid: { + '!password': storage['arguments']['!superusers'][uid].get('!password'), + 'sudoer': True + } + for uid in storage['arguments'].get('!superusers', {}) + }) + if storage['arguments'].get('!users', {}): + lusers.update({ + uid: { + '!password': storage['arguments']['!users'][uid].get('!password'), + 'sudoer': False + } + for uid in storage['arguments'].get('!users', {}) + }) + # processing + lusers = UserList(prompt, lusers, sudo).run() + # return data + superusers = { + uid: { + '!password': lusers[uid].get('!password') + } + for uid in lusers if lusers[uid].get('sudoer', False) + } + users = {uid: {'!password': lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer', False)} + storage['arguments']['!superusers'] = superusers + storage['arguments']['!users'] = users + return superusers, users + + +def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]: + prompt = prompt if prompt else str(_('Define users with sudo privilege: ')) + superusers, dummy = manage_users(prompt, sudo=True) + return superusers + + +def ask_for_additional_users(prompt: str = '') -> Dict[str, Dict[str, str | None]]: + prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ') + dummy, users = manage_users(prompt, sudo=False) + return users diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py new file mode 100644 index 00000000..f90a2af8 --- /dev/null +++ b/archinstall/lib/user_interaction/network_conf.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import ipaddress +import logging +from copy import copy +from typing import Any, Optional, Dict, TYPE_CHECKING + +from ..menu.text_input import TextInput +from ..models.network_configuration import NetworkConfiguration, NicType + +from ..networking import list_interfaces +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def ask_to_configure_network(preset: Dict[str, Any] = {}) -> Optional[NetworkConfiguration]: + """ + Configure the network on the newly installed system + """ + interfaces = { + 'none': str(_('No network configuration')), + 'iso_config': str(_('Copy ISO network configuration to installation')), + 'network_manager': str(_('Use NetworkManager (necessary to configure internet graphically in GNOME and KDE)')), + **list_interfaces() + } + # for this routine it's easier to set the cursor position rather than a preset value + cursor_idx = None + if preset: + if preset['type'] == 'iso_config': + cursor_idx = 0 + elif preset['type'] == 'network_manager': + cursor_idx = 1 + else: + try: + # let's hope order in dictionaries stay + cursor_idx = list(interfaces.values()).index(preset.get('type')) + except ValueError: + pass + + nic = Menu(_('Select one network interface to configure'), interfaces.values(), cursor_index=cursor_idx, + sort=False).run() + + if not nic: + return None + + if nic == interfaces['none']: + return None + elif nic == interfaces['iso_config']: + return NetworkConfiguration(NicType.ISO) + elif nic == interfaces['network_manager']: + return NetworkConfiguration(NicType.NM) + else: + # Current workaround: + # For selecting modes without entering text within brackets, + # printing out this part separate from options, passed in + # `generic_select` + # we only keep data if it is the same nic as before + if preset.get('type') != nic: + preset_d = {'type': nic, 'dhcp': True, 'ip': None, 'gateway': None, 'dns': []} + else: + preset_d = copy(preset) + + modes = ['DHCP (auto detect)', 'IP (static)'] + default_mode = 'DHCP (auto detect)' + cursor_idx = 0 if preset_d.get('dhcp', True) else 1 + + prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(nic, default_mode) + mode = Menu(prompt, modes, default_option=default_mode, cursor_index=cursor_idx).run() + # TODO preset values for ip and gateway + if mode == 'IP (static)': + while 1: + prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(nic) + ip = TextInput(prompt, preset_d.get('ip')).run().strip() + # Implemented new check for correct IP/subnet input + try: + ipaddress.ip_interface(ip) + break + except ValueError: + log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red') + + # Implemented new check for correct gateway IP address + while 1: + gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '), + preset_d.get('gateway')).run().strip() + try: + if len(gateway) == 0: + gateway = None + else: + ipaddress.ip_address(gateway) + break + except ValueError: + log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red') + + dns = None + if preset_d.get('dns'): + preset_d['dns'] = ' '.join(preset_d['dns']) + else: + preset_d['dns'] = None + dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '), + preset_d['dns']).run().strip() + + if len(dns_input): + dns = dns_input.split(' ') + + return NetworkConfiguration(NicType.MANUAL, iface=nic, ip=ip, gateway=gateway, dns=dns, dhcp=False) + else: + # this will contain network iface names + return NetworkConfiguration(NicType.MANUAL, iface=nic) diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py new file mode 100644 index 00000000..8c5d1375 --- /dev/null +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -0,0 +1,295 @@ +from __future__ import annotations + +from typing import List, Any, Dict, Union, TYPE_CHECKING + +from ..disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position +from ..menu import Menu +from ..output import log + +from ..disk.validators import fs_types + +if TYPE_CHECKING: + from ..disk.partition import Partition + _: Any + + +def partition_overlap(partitions: list, start: str, end: str) -> bool: + # TODO: Implement sanity check + return False + + +def _current_partition_layout(partitions: List[Partition], with_idx: bool = False) -> str: + + def do_padding(name, max_len): + spaces = abs(len(str(name)) - max_len) + 2 + pad_left = int(spaces / 2) + pad_right = spaces - pad_left + return f'{pad_right * " "}{name}{pad_left * " "}|' + + column_names = {} + + # this will add an initial index to the table for each partition + if with_idx: + column_names['index'] = max([len(str(len(partitions))), len('index')]) + + # determine all attribute names and the max length + # of the value among all partitions to know the width + # of the table cells + for p in partitions: + for attribute, value in p.items(): + if attribute in column_names.keys(): + column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)]) + else: + column_names[attribute] = max([len(str(value)), len(attribute)]) + + current_layout = '' + for name, max_len in column_names.items(): + current_layout += do_padding(name, max_len) + + current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n' + + for idx, p in enumerate(partitions): + row = '' + for name, max_len in column_names.items(): + if name == 'index': + row += do_padding(str(idx), max_len) + elif name in p: + row += do_padding(p[name], max_len) + else: + row += ' ' * (max_len + 2) + '|' + + current_layout += f'{row[:-1]}\n' + + title = str(_('Current partition layout')) + return f'\n\n{title}:\n\n{current_layout}' + + +def select_partition(title: str, partitions: List[Partition], multiple: bool = False) -> Union[int, List[int], None]: + partition_indexes = list(map(str, range(len(partitions)))) + partition = Menu(title, partition_indexes, multi=multiple).run() + + if partition is not None: + if isinstance(partition, list): + return [int(p) for p in partition] + else: + return int(partition) + + return None + + +def get_default_partition_layout(block_devices: Union[BlockDevice, List[BlockDevice]], + advanced_options: bool = False) -> Dict[str, Any]: + + if len(block_devices) == 1: + return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) + else: + return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) + + +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: + result = {} + + for device in block_devices: + layout = manage_new_and_existing_partitions(device) + + result[device.path] = layout + + return result + + +def manage_new_and_existing_partitions(block_device: BlockDevice) -> Dict[str, Any]: + block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} + # Test code: [part.__dump__() for part in block_device.partitions.values()] + # TODO: Squeeze in BTRFS subvolumes here + + new_partition = str(_('Create a new partition')) + suggest_partition_layout = str(_('Suggest partition layout')) + delete_partition = str(_('Delete a partition')) + delete_all_partitions = str(_('Clear/Delete all partitions')) + assign_mount_point = str(_('Assign mount-point for a partition')) + mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)')) + mark_encrypted = str(_('Mark/Unmark a partition as encrypted')) + mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)')) + set_filesystem_partition = str(_('Set desired filesystem for a partition')) + + while True: + modes = [new_partition, suggest_partition_layout] + + if len(block_device_struct['partitions']): + modes += [ + delete_partition, + delete_all_partitions, + assign_mount_point, + mark_formatted, + mark_encrypted, + mark_bootable, + set_filesystem_partition, + ] + + title = _('Select what to do with\n{}').format(block_device) + + # show current partition layout: + if len(block_device_struct["partitions"]): + title += _current_partition_layout(block_device_struct['partitions']) + '\n' + + task = Menu(title, modes, sort=False).run() + + if not task: + break + + if task == new_partition: + # if partition_type == 'gpt': + # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html + # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html + # name = input("Enter a desired name for the partition: ").strip() + + fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() + + prompt = _('Enter the start sector (percentage or block number, default: {}): ').format( + block_device.first_free_sector) + start = input(prompt).strip() + + if not start.strip(): + start = block_device.first_free_sector + end_suggested = block_device.first_end_sector + else: + end_suggested = '100%' + + prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format( + end_suggested) + end = input(prompt).strip() + + if not end.strip(): + end = end_suggested + + if valid_parted_position(start) and valid_parted_position(end): + if partition_overlap(block_device_struct["partitions"], start, end): + log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", + fg="red") + continue + + block_device_struct["partitions"].append({ + "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject + "start": start, + "size": end, + "mountpoint": None, + "wipe": True, + "filesystem": { + "format": fstype + } + }) + else: + log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.", + fg="red") + continue + elif task == suggest_partition_layout: + if len(block_device_struct["partitions"]): + prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device) + choice = Menu(prompt, ['yes', 'no'], default_option='no').run() + + if choice == 'no': + continue + + block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) + elif task is None: + return block_device_struct + else: + current_layout = _current_partition_layout(block_device_struct['partitions'], with_idx=True) + + if task == delete_partition: + title = _('{}\n\nSelect by index which partitions to delete').format(current_layout) + to_delete = select_partition(title, block_device_struct["partitions"], multiple=True) + + if to_delete: + block_device_struct['partitions'] = [ + p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete + ] + elif task == delete_all_partitions: + block_device_struct["partitions"] = [] + elif task == assign_mount_point: + title = _('{}\n\nSelect by index which partition to mount where').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + print( + _(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + mountpoint = input( + _('Select where to mount partition (leave blank to remove mountpoint): ')).strip() + + if len(mountpoint): + block_device_struct["partitions"][partition]['mountpoint'] = mountpoint + if mountpoint == '/boot': + log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow") + block_device_struct["partitions"][partition]['boot'] = True + else: + del (block_device_struct["partitions"][partition]['mountpoint']) + + elif task == mark_formatted: + title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if block_device_struct["partitions"][partition].get('filesystem',{}).get('format', 'crypto_LUKS') == 'crypto_LUKS': + if not block_device_struct["partitions"][partition].get('filesystem', None): + block_device_struct["partitions"][partition]['filesystem'] = {} + + fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), + skip=False).run() + + block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + + # Negate the current wipe marking + block_device_struct["partitions"][partition][ + 'wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) + + elif task == mark_encrypted: + title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + # Negate the current encryption marking + block_device_struct["partitions"][partition][ + 'encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False) + + elif task == mark_bootable: + title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + block_device_struct["partitions"][partition][ + 'boot'] = not block_device_struct["partitions"][partition].get('boot', False) + + elif task == set_filesystem_partition: + title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + if not block_device_struct["partitions"][partition].get('filesystem', None): + block_device_struct["partitions"][partition]['filesystem'] = {} + + fstype_title = _('Enter a desired filesystem type for the partition: ') + fstype = Menu(fstype_title, fs_types(), skip=False).run() + + block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + + return block_device_struct + + +def select_encrypted_partitions(block_devices: dict, password: str) -> dict: + for device in block_devices: + for partition in block_devices[device]['partitions']: + if partition.get('mountpoint', None) != '/boot': + partition['encrypted'] = True + partition['!password'] = password + + if partition['mountpoint'] != '/': + # Tell the upcoming steps to generate a key-file for non root mounts. + partition['generate-encryption-key-file'] = True + + return block_devices + + # TODO: Next version perhaps we can support mixed multiple encrypted partitions + # Users might want to single out a partition for non-encryption to share between dualboot etc. diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py new file mode 100644 index 00000000..c52b97e2 --- /dev/null +++ b/archinstall/lib/user_interaction/save_conf.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, TYPE_CHECKING + +from ..configuration import ConfigurationOutput +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def save_config(config: Dict): + + def preview(selection: str): + if options['user_config'] == selection: + json_config = config_output.user_config_to_json() + return f'{config_output.user_configuration_file}\n{json_config}' + elif options['user_creds'] == selection: + if json_config := config_output.user_credentials_to_json(): + return f'{config_output.user_credentials_file}\n{json_config}' + else: + return str(_('No configuration')) + elif options['disk_layout'] == selection: + if json_config := config_output.disk_layout_to_json(): + return f'{config_output.disk_layout_file}\n{json_config}' + else: + return str(_('No configuration')) + elif options['all'] == selection: + output = f'{config_output.user_configuration_file}\n' + if json_config := config_output.user_credentials_to_json(): + output += f'{config_output.user_credentials_file}\n' + if json_config := config_output.disk_layout_to_json(): + output += f'{config_output.disk_layout_file}\n' + return output[:-1] + return None + + config_output = ConfigurationOutput(config) + + options = { + 'user_config': str(_('Save user configuration')), + 'user_creds': str(_('Save user credentials')), + 'disk_layout': str(_('Save disk layout')), + 'all': str(_('Save all')) + } + + selection = Menu(_('Choose which configuration to save'), + list(options.values()), + sort=False, + skip=True, + preview_size=0.75, + preview_command=preview).run() + + if not selection: + return + + while True: + path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ') + dest_path = Path(path) + if dest_path.exists() and dest_path.is_dir(): + break + log(_('Not a valid directory: {}').format(dest_path), fg='red') + + if options['user_config'] == selection: + config_output.save_user_config(dest_path) + elif options['user_creds'] == selection: + config_output.save_user_creds(dest_path) + elif options['disk_layout'] == selection: + config_output.save_disk_layout(dest_path) + elif options['all'] == selection: + config_output.save_user_config(dest_path) + config_output.save_user_creds(dest_path) + config_output.save_disk_layout diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py new file mode 100644 index 00000000..0120fd8a --- /dev/null +++ b/archinstall/lib/user_interaction/system_conf.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +from typing import List, Any, Dict, TYPE_CHECKING + +from ..disk import all_blockdevices +from ..exceptions import RequirementError +from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics +from ..menu import Menu +from ..storage import storage + +from ..translation import DeferredTranslation + +if TYPE_CHECKING: + _: Any + + +def select_kernel(preset: List[str] = None) -> List[str]: + """ + Asks the user to select a kernel for system. + + :return: The string as a selected kernel + :rtype: string + """ + + kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"] + default_kernel = "linux" + + selected_kernels = Menu(_('Choose which kernels to use or leave blank for default "{}"').format(default_kernel), + kernels, + sort=True, + multi=True, + preset_values=preset, + default_option=default_kernel).run() + return selected_kernels + + +def select_harddrives(preset: List[str] = []) -> List[str]: + """ + Asks the user to select one or multiple hard drives + + :return: List of selected hard drives + :rtype: list + """ + hard_drives = all_blockdevices(partitions=False).values() + options = {f'{option}': option for option in hard_drives} + + if preset: + preset_disks = {f'{option}': option for option in preset} + else: + preset_disks = {} + + selected_harddrive = Menu(_('Select one or more hard drives to use and configure'), + list(options.keys()), + preset_values=list(preset_disks.keys()), + multi=True).run() + + if selected_harddrive and len(selected_harddrive) > 0: + return [options[i] for i in selected_harddrive] + + return [] + + +def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bool = False) -> str: + """ + Some what convoluted function, whose job is simple. + Select a graphics driver from a pre-defined set of popular options. + + (The template xorg is for beginner users, not advanced, and should + there for appeal to the general public first and edge cases later) + """ + + drivers = sorted(list(options)) + + if drivers: + arguments = storage.get('arguments', {}) + title = DeferredTranslation('') + + if has_amd_graphics(): + title += _( + 'For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.' + ) + '\n' + if has_intel_graphics(): + title += _( + 'For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n' + ) + if has_nvidia_graphics(): + title += _( + 'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n' + ) + + if not arguments.get('gfx_driver', None) or force_ask: + title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers') + arguments['gfx_driver'] = Menu(title, drivers).run() + + if arguments.get('gfx_driver', None) is None: + arguments['gfx_driver'] = _("All open-source (default)") + + return options.get(arguments.get('gfx_driver')) + + raise RequirementError("Selecting drivers require a least one profile to be given as an option.") + + +def ask_for_bootloader(advanced_options: bool = False, preset: str = None) -> str: + + if preset == 'systemd-bootctl': + preset_val = 'systemd-boot' if advanced_options else 'no' + elif preset == 'grub-install': + preset_val = 'grub' if advanced_options else 'yes' + else: + preset_val = preset + + bootloader = "systemd-bootctl" if has_uefi() else "grub-install" + if has_uefi(): + if not advanced_options: + bootloader_choice = Menu(_('Would you like to use GRUB as a bootloader instead of systemd-boot?'), + ['yes', 'no'], + preset_values=preset_val, + default_option='no').run() + + if bootloader_choice == "yes": + bootloader = "grub-install" + else: + # We use the common names for the bootloader as the selection, and map it back to the expected values. + choices = ['systemd-boot', 'grub', 'efistub'] + selection = Menu(_('Choose a bootloader'), choices, preset_values=preset_val).run() + if selection != "": + if selection == 'systemd-boot': + bootloader = 'systemd-bootctl' + elif selection == 'grub': + bootloader = 'grub-install' + else: + bootloader = selection + + return bootloader + + +def ask_for_swap(preset: bool = True) -> bool: + if preset: + preset_val = 'yes' + else: + preset_val = 'no' + prompt = _('Would you like to use swap on zram?') + choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run() + return False if choice == 'no' else True diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py new file mode 100644 index 00000000..48b55e8c --- /dev/null +++ b/archinstall/lib/user_interaction/utils.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import getpass +import signal +import sys +import time +from typing import Any, Optional, TYPE_CHECKING + +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + +# used for signal handler +SIG_TRIGGER = None + + +def check_password_strong(passwd: str) -> bool: + symbol_count = 0 + if any(character.isdigit() for character in passwd): + symbol_count += 10 + if any(character.isupper() for character in passwd): + symbol_count += 26 + if any(character.islower() for character in passwd): + symbol_count += 26 + if any(not character.isalnum() for character in passwd): + symbol_count += 40 + + if symbol_count**len(passwd) < 10e20: + + prompt = _("The password you are using seems to be weak,") + prompt += _("are you sure you want to use it?") + + choice = Menu(prompt, ["yes", "no"], default_option="yes").run() + return choice == "yes" + + return True + + +def get_password(prompt: str = '') -> Optional[str]: + if not prompt: + prompt = _("Enter a password: ") + + while passwd := getpass.getpass(prompt): + + if len(passwd.strip()) <= 0: + break + + if not check_password_strong(passwd): + continue + + passwd_verification = getpass.getpass(prompt=_('And one more time for verification: ')) + if passwd != passwd_verification: + log(' * Passwords did not match * ', fg='red') + continue + + return passwd + return None + + +def do_countdown() -> bool: + SIG_TRIGGER = False + + def kill_handler(sig: int, frame: Any) -> None: + print() + exit(0) + + def sig_handler(sig: int, frame: Any) -> None: + global SIG_TRIGGER + SIG_TRIGGER = True + signal.signal(signal.SIGINT, kill_handler) + + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, sig_handler) + + for i in range(5, 0, -1): + print(f"{i}", end='') + + for x in range(4): + sys.stdout.flush() + time.sleep(0.25) + print(".", end='') + + if SIG_TRIGGER: + prompt = _('Do you really want to abort?') + choice = Menu(prompt, ['yes', 'no'], skip=False).run() + if choice == 'yes': + exit(0) + + if SIG_TRIGGER is False: + sys.stdin.read() + SIG_TRIGGER = False + signal.signal(signal.SIGINT, sig_handler) + + print() + signal.signal(signal.SIGINT, original_sigint_handler) + + return True -- cgit v1.2.3-54-g00ecf