From 3dc0d957e838c34b48a0782d2540341e33b24070 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 28 Mar 2022 22:49:05 +1100 Subject: Deflate user interactions (#1019) * Deflate the user interactions file * Fix flake8 Co-authored-by: Daniel Girtler --- archinstall/lib/user_interaction.py | 1345 -------------------- archinstall/lib/user_interaction/__init__.py | 12 + .../user_interaction/backwards_compatible_conf.py | 95 ++ archinstall/lib/user_interaction/disk_conf.py | 81 ++ archinstall/lib/user_interaction/general_conf.py | 184 +++ archinstall/lib/user_interaction/locale_conf.py | 35 + .../lib/user_interaction/manage_users_conf.py | 169 +++ archinstall/lib/user_interaction/network_conf.py | 111 ++ .../lib/user_interaction/partitioning_conf.py | 295 +++++ archinstall/lib/user_interaction/save_conf.py | 74 ++ archinstall/lib/user_interaction/system_conf.py | 144 +++ archinstall/lib/user_interaction/utils.py | 99 ++ 12 files changed, 1299 insertions(+), 1345 deletions(-) delete mode 100644 archinstall/lib/user_interaction.py create mode 100644 archinstall/lib/user_interaction/__init__.py create mode 100644 archinstall/lib/user_interaction/backwards_compatible_conf.py create mode 100644 archinstall/lib/user_interaction/disk_conf.py create mode 100644 archinstall/lib/user_interaction/general_conf.py create mode 100644 archinstall/lib/user_interaction/locale_conf.py create mode 100644 archinstall/lib/user_interaction/manage_users_conf.py create mode 100644 archinstall/lib/user_interaction/network_conf.py create mode 100644 archinstall/lib/user_interaction/partitioning_conf.py create mode 100644 archinstall/lib/user_interaction/save_conf.py create mode 100644 archinstall/lib/user_interaction/system_conf.py create mode 100644 archinstall/lib/user_interaction/utils.py (limited to 'archinstall/lib') diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py deleted file mode 100644 index c6f3eef7..00000000 --- a/archinstall/lib/user_interaction.py +++ /dev/null @@ -1,1345 +0,0 @@ -from __future__ import annotations -import getpass -import ipaddress -import logging -import re -import select # Used for char by char polling of sys.stdin -import shutil -import signal -import sys -import time -from collections.abc import Iterable -from pathlib import Path -from copy import copy -from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING - -# https://stackoverflow.com/a/39757388/929999 -from .menu.text_input import TextInput -from .configuration import ConfigurationOutput -from .models.network_configuration import NetworkConfiguration, NicType - -if TYPE_CHECKING: - from .disk.partition import Partition - _: Any - -from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_blockdevices -from .exceptions import RequirementError, DiskError -from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics -from .locale_helpers import list_keyboard_languages, list_timezones, list_locales -from .networking import list_interfaces -from .menu import Menu -from .menu.list_manager import ListManager -from .output import log -from .profiles import Profile, list_profiles -from .storage import storage -from .mirrors import list_mirrors - -# TODO: Some inconsistencies between the selection processes. -# Some return the keys from the options, some the values? -from .translation import Translation, DeferredTranslation -from .disk.validators import fs_types -from .packages.packages import validate_package_list - - -# used for signal handler -SIG_TRIGGER = None - - -# TODO: These can be removed after the move to simple_menu.py -def get_terminal_height() -> int: - return shutil.get_terminal_size().lines - - -def get_terminal_width() -> int: - return shutil.get_terminal_size().columns - - -def get_longest_option(options :List[Any]) -> int: - return max([len(x) for x in options]) - - -def check_for_correct_username(username :str) -> bool: - if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: - return True - log( - "The username you entered is invalid. Try again", - level=logging.WARNING, - fg='red' - ) - return False - - -def do_countdown() -> bool: - SIG_TRIGGER = False - - def kill_handler(sig :int, frame :Any) -> None: - print() - exit(0) - - def sig_handler(sig :int, frame :Any) -> None: - global SIG_TRIGGER - SIG_TRIGGER = True - signal.signal(signal.SIGINT, kill_handler) - - original_sigint_handler = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, sig_handler) - - for i in range(5, 0, -1): - print(f"{i}", end='') - - for x in range(4): - sys.stdout.flush() - time.sleep(0.25) - print(".", end='') - - if SIG_TRIGGER: - prompt = _('Do you really want to abort?') - choice = Menu(prompt, ['yes', 'no'], skip=False).run() - if choice == 'yes': - exit(0) - - if SIG_TRIGGER is False: - sys.stdin.read() - SIG_TRIGGER = False - signal.signal(signal.SIGINT, sig_handler) - - print() - signal.signal(signal.SIGINT, original_sigint_handler) - - return True - -def check_password_strong(passwd :str) -> bool: - - symbol_count = 0 - if any(character.isdigit() for character in passwd): - symbol_count += 10 - if any(character.isupper() for character in passwd): - symbol_count += 26 - if any(character.islower() for character in passwd): - symbol_count += 26 - if any(not character.isalnum() for character in passwd): - symbol_count += 40 - - if symbol_count ** len(passwd) < 10e20: - - prompt = _("The password you are using seems to be weak,") - prompt += _("are you sure you want to use it?") - - choice = Menu(prompt, ["yes", "no"], default_option="yes").run() - return choice == "yes" - - return True - - -def get_password(prompt :str = '') -> Optional[str]: - if not prompt: - prompt = _("Enter a password: ") - - while passwd := getpass.getpass(prompt): - - if len(passwd.strip()) <= 0: - break - - if not check_password_strong(passwd): - continue - - passwd_verification = getpass.getpass(prompt=_('And one more time for verification: ')) - if passwd != passwd_verification: - log(' * Passwords did not match * ', fg='red') - continue - - return passwd - return None - - -def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]: - highest_index_number_length = len(str(len(options))) - longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding - spaces_without_option = longest_line - (len(separator) + highest_index_number_length) - max_num_of_columns = get_terminal_width() // longest_line - max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom) - - if len(options) > max_options_in_cells: - for index, option in enumerate(options): - print(f"{index}: {option}") - return 1, index - else: - for row in range(0, (get_terminal_height() - margin_bottom)): - for column in range(row, len(options), (get_terminal_height() - margin_bottom)): - spaces = " " * (spaces_without_option - len(options[column])) - print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end=spaces) - print() - - return column, row - - -def select_encrypted_partitions(block_devices :dict, password :str) -> dict: - for device in block_devices: - for partition in block_devices[device]['partitions']: - if partition.get('mountpoint', None) != '/boot': - partition['encrypted'] = True - partition['!password'] = password - - if partition['mountpoint'] != '/': - # Tell the upcoming steps to generate a key-file for non root mounts. - partition['generate-encryption-key-file'] = True - - return block_devices - - # TODO: Next version perhaps we can support mixed multiple encrypted partitions - # Users might want to single out a partition for non-encryption to share between dualboot etc. - - -# TODO: This can be removed once we have simple_menu everywhere -class MiniCurses: - def __init__(self, width, height): - self.width = width - self.height = height - - self._cursor_y = 0 - self._cursor_x = 0 - - self.input_pos = 0 - - def write_line(self, text, clear_line=True): - if clear_line: - sys.stdout.flush() - sys.stdout.write("\033[%dG" % 0) - sys.stdout.flush() - sys.stdout.write(" " * (get_terminal_width() - 1)) - sys.stdout.flush() - sys.stdout.write("\033[%dG" % 0) - sys.stdout.flush() - sys.stdout.write(text) - sys.stdout.flush() - self._cursor_x += len(text) - - def clear(self, x, y): - if x < 0: - x = 0 - if y < 0: - y = 0 - - # import time - # sys.stdout.write(f"Clearing from: {x, y}") - # sys.stdout.flush() - # time.sleep(2) - - sys.stdout.flush() - sys.stdout.write('\033[%d;%df' % (y, x)) - for line in range(get_terminal_height() - y - 1, y): - sys.stdout.write(" " * (get_terminal_width() - 1)) - sys.stdout.flush() - sys.stdout.write('\033[%d;%df' % (y, x)) - sys.stdout.flush() - - def deal_with_control_characters(self, char): - mapper = { - '\x7f': 'BACKSPACE', - '\r': 'CR', - '\n': 'NL' - } - - if (mapped_char := mapper.get(char, None)) == 'BACKSPACE': - if self._cursor_x <= self.input_pos: - # Don't backspace further back than the cursor start position during input - return True - # Move back to the current known position (BACKSPACE doesn't updated x-pos) - sys.stdout.flush() - sys.stdout.write("\033[%dG" % self._cursor_x) - sys.stdout.flush() - - # Write a blank space - sys.stdout.flush() - sys.stdout.write(" ") - sys.stdout.flush() - - # And move back again - sys.stdout.flush() - sys.stdout.write("\033[%dG" % self._cursor_x) - sys.stdout.flush() - - self._cursor_x -= 1 - - return True - elif mapped_char in ('CR', 'NL'): - return True - - return None - - def get_keyboard_input(self, strip_rowbreaks=True, end='\n'): - assert end in ['\r', '\n', None] - import termios - import tty - - poller = select.epoll() - response = '' - - sys_fileno = sys.stdin.fileno() - old_settings = termios.tcgetattr(sys_fileno) - tty.setraw(sys_fileno) - - poller.register(sys.stdin.fileno(), select.EPOLLIN) - - eof = False - while eof is False: - for fileno, event in poller.poll(0.025): - char = sys.stdin.read(1) - - # sys.stdout.write(f"{[char]}") - # sys.stdout.flush() - - if newline := (char in ('\n', '\r')): - eof = True - - if not newline or strip_rowbreaks is False: - response += char - - if self.deal_with_control_characters(char) is not True: - self.write_line(response[-1], clear_line=False) - - termios.tcsetattr(sys_fileno, termios.TCSADRAIN, old_settings) - - if end: - sys.stdout.write(end) - sys.stdout.flush() - self._cursor_x = 0 - self._cursor_y += 1 - - if response: - return response - - -def ask_for_swap(preset :bool = True) -> bool: - if preset: - preset_val = 'yes' - else: - preset_val = 'no' - prompt = _('Would you like to use swap on zram?') - choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run() - return False if choice == 'no' else True - - -def ask_ntp(preset :bool = True) -> bool: - prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n')) - prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki')) - if preset: - preset_val = 'yes' - else: - preset_val = 'no' - choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run() - return False if choice == 'no' else True - - -def ask_hostname(preset :str = None) -> str : - hostname = TextInput(_('Desired hostname for the installation: '),preset).run().strip(' ') - return hostname - - -def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]: - prompt = prompt if prompt else str(_('Define users with sudo privilege: ')) - superusers,dummy = manage_users(prompt,sudo=True) - return superusers - - -def ask_for_additional_users(prompt :str = '') -> Dict[str, Dict[str, str | None]]: - prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ') - dummy,users = manage_users(prompt,sudo=False) - return users - - -def ask_for_a_timezone(preset :str = None) -> str: - timezones = list_timezones() - default = 'UTC' - - selected_tz = Menu( - _('Select a timezone'), - list(timezones), - skip=False, - preset_values=preset, - default_option=default - ).run() - - return selected_tz - -def ask_for_bootloader(advanced_options :bool = False, preset :str = None) -> str: - - if preset == 'systemd-bootctl': - preset_val = 'systemd-boot' if advanced_options else 'no' - elif preset == 'grub-install': - preset_val = 'grub' if advanced_options else 'yes' - else: - preset_val = preset - - bootloader = "systemd-bootctl" if has_uefi() else "grub-install" - if has_uefi(): - if not advanced_options: - bootloader_choice = Menu( - _('Would you like to use GRUB as a bootloader instead of systemd-boot?'), - ['yes', 'no'], - preset_values=preset_val, - default_option='no' - ).run() - - if bootloader_choice == "yes": - bootloader = "grub-install" - else: - # We use the common names for the bootloader as the selection, and map it back to the expected values. - choices = ['systemd-boot', 'grub', 'efistub'] - selection = Menu(_('Choose a bootloader'), choices,preset_values=preset_val).run() - if selection != "": - if selection == 'systemd-boot': - bootloader = 'systemd-bootctl' - elif selection == 'grub': - bootloader = 'grub-install' - else: - bootloader = selection - - return bootloader - - -def ask_for_audio_selection(desktop :bool = True, preset :str = None) -> str: - audio = 'pipewire' if desktop else 'none' - choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] - selected_audio = Menu( - _('Choose an audio server'), - choices, - preset_values=preset, - default_option=audio, - skip=False - ).run() - return selected_audio - - -def ask_additional_packages_to_install(pre_set_packages :List[str] = []) -> List[str]: - # Additional packages (with some light weight error handling for invalid package names) - print(_('Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.')) - print(_('If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.')) - - def read_packages(already_defined: list = []) -> list: - display = ' '.join(already_defined) - input_packages = TextInput( - _('Write additional packages to install (space separated, leave blank to skip): '), - display - ).run() - return input_packages.split(' ') if input_packages else [] - - pre_set_packages = pre_set_packages if pre_set_packages else [] - packages = read_packages(pre_set_packages) - - while True: - if len(packages): - # Verify packages that were given - print(_("Verifying that additional packages exist (this might take a few seconds)")) - valid, invalid = validate_package_list(packages) - - if invalid: - log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') - packages = read_packages(valid) - continue - break - - return packages - - -def ask_to_configure_network(preset :Dict[str, Any] = {}) -> Optional[NetworkConfiguration]: - """ - Configure the network on the newly installed system - """ - interfaces = { - 'none': str(_('No network configuration')), - 'iso_config': str(_('Copy ISO network configuration to installation')), - 'network_manager': str(_('Use NetworkManager (necessary to configure internet graphically in GNOME and KDE)')), - **list_interfaces() - } - # for this routine it's easier to set the cursor position rather than a preset value - cursor_idx = None - if preset: - if preset['type'] == 'iso_config': - cursor_idx = 0 - elif preset['type'] == 'network_manager': - cursor_idx = 1 - else: - try : - # let's hope order in dictionaries stay - cursor_idx = list(interfaces.values()).index(preset.get('type')) - except ValueError: - pass - - nic = Menu(_('Select one network interface to configure'), interfaces.values(), cursor_index=cursor_idx, sort=False).run() - - if not nic: - return None - - if nic == interfaces['none']: - return None - elif nic == interfaces['iso_config']: - return NetworkConfiguration(NicType.ISO) - elif nic == interfaces['network_manager']: - return NetworkConfiguration(NicType.NM) - else: - # Current workaround: - # For selecting modes without entering text within brackets, - # printing out this part separate from options, passed in - # `generic_select` - # we only keep data if it is the same nic as before - if preset.get('type') != nic: - preset_d = {'type': nic, 'dhcp': True, 'ip': None, 'gateway': None, 'dns': []} - else: - preset_d = copy(preset) - - modes = ['DHCP (auto detect)', 'IP (static)'] - default_mode = 'DHCP (auto detect)' - cursor_idx = 0 if preset_d.get('dhcp',True) else 1 - - prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(nic, default_mode) - mode = Menu(prompt, modes, default_option=default_mode, cursor_index=cursor_idx).run() - # TODO preset values for ip and gateway - if mode == 'IP (static)': - while 1: - prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(nic) - ip = TextInput(prompt,preset_d.get('ip')).run().strip() - # Implemented new check for correct IP/subnet input - try: - ipaddress.ip_interface(ip) - break - except ValueError: - log( - "You need to enter a valid IP in IP-config mode.", - level=logging.WARNING, - fg='red' - ) - - # Implemented new check for correct gateway IP address - while 1: - gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),preset_d.get('gateway')).run().strip() - try: - if len(gateway) == 0: - gateway = None - else: - ipaddress.ip_address(gateway) - break - except ValueError: - log( - "You need to enter a valid gateway (router) IP address.", - level=logging.WARNING, - fg='red' - ) - - dns = None - if preset_d.get('dns'): - preset_d['dns'] = ' '.join(preset_d['dns']) - else: - preset_d['dns'] = None - dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '),preset_d['dns']).run().strip() - - if len(dns_input): - dns = dns_input.split(' ') - - return NetworkConfiguration( - NicType.MANUAL, - iface=nic, - ip=ip, - gateway=gateway, - dns=dns, - dhcp=False - ) - else: - # this will contain network iface names - return NetworkConfiguration(NicType.MANUAL, iface=nic) - - -def partition_overlap(partitions :list, start :str, end :str) -> bool: - # TODO: Implement sanity check - return False - - -def ask_for_main_filesystem_format(advanced_options=False): - options = { - 'btrfs': 'btrfs', - 'ext4': 'ext4', - 'xfs': 'xfs', - 'f2fs': 'f2fs' - } - - advanced = { - 'ntfs': 'ntfs' - } - - if advanced_options: - options.update(advanced) - - prompt = _('Select which filesystem your main partition should use') - choice = Menu(prompt, options, skip=False).run() - return choice - - -def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> str: - def do_padding(name, max_len): - spaces = abs(len(str(name)) - max_len) + 2 - pad_left = int(spaces / 2) - pad_right = spaces - pad_left - return f'{pad_right * " "}{name}{pad_left * " "}|' - - column_names = {} - - # this will add an initial index to the table for each partition - if with_idx: - column_names['index'] = max([len(str(len(partitions))), len('index')]) - - # determine all attribute names and the max length - # of the value among all partitions to know the width - # of the table cells - for p in partitions: - for attribute, value in p.items(): - if attribute in column_names.keys(): - column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)]) - else: - column_names[attribute] = max([len(str(value)), len(attribute)]) - - current_layout = '' - for name, max_len in column_names.items(): - current_layout += do_padding(name, max_len) - - current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n' - - for idx, p in enumerate(partitions): - row = '' - for name, max_len in column_names.items(): - if name == 'index': - row += do_padding(str(idx), max_len) - elif name in p: - row += do_padding(p[name], max_len) - else: - row += ' ' * (max_len + 2) + '|' - - current_layout += f'{row[:-1]}\n' - - title = str(_('Current partition layout')) - return f'\n\n{title}:\n\n{current_layout}' - - -def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]: - partition_indexes = list(map(str, range(len(partitions)))) - partition = Menu(title, partition_indexes, multi=multiple).run() - - if partition is not None: - if isinstance(partition, list): - return [int(p) for p in partition] - else: - return int(partition) - - return None - -def get_default_partition_layout( - block_devices :Union[BlockDevice, List[BlockDevice]], - advanced_options :bool = False -) -> Dict[str, Any]: - - if len(block_devices) == 1: - return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) - else: - return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) - - -def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]: - block_device_struct = { - "partitions": [partition.__dump__() for partition in block_device.partitions.values()] - } - # Test code: [part.__dump__() for part in block_device.partitions.values()] - # TODO: Squeeze in BTRFS subvolumes here - - new_partition = str(_('Create a new partition')) - suggest_partition_layout = str(_('Suggest partition layout')) - delete_partition = str(_('Delete a partition')) - delete_all_partitions = str(_('Clear/Delete all partitions')) - assign_mount_point = str(_('Assign mount-point for a partition')) - mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)')) - mark_encrypted = str(_('Mark/Unmark a partition as encrypted')) - mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)')) - set_filesystem_partition = str(_('Set desired filesystem for a partition')) - - while True: - modes = [new_partition, suggest_partition_layout] - - if len(block_device_struct['partitions']): - modes += [ - delete_partition, - delete_all_partitions, - assign_mount_point, - mark_formatted, - mark_encrypted, - mark_bootable, - set_filesystem_partition, - ] - - title = _('Select what to do with\n{}').format(block_device) - - # show current partition layout: - if len(block_device_struct["partitions"]): - title += current_partition_layout(block_device_struct['partitions']) + '\n' - - task = Menu(title, modes, sort=False).run() - - if not task: - break - - if task == new_partition: - # if partition_type == 'gpt': - # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html - # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html - # name = input("Enter a desired name for the partition: ").strip() - - fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() - - prompt = _('Enter the start sector (percentage or block number, default: {}): ').format(block_device.first_free_sector) - start = input(prompt).strip() - - if not start.strip(): - start = block_device.first_free_sector - end_suggested = block_device.first_end_sector - else: - end_suggested = '100%' - - prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format(end_suggested) - end = input(prompt).strip() - - if not end.strip(): - end = end_suggested - - if valid_parted_position(start) and valid_parted_position(end): - if partition_overlap(block_device_struct["partitions"], start, end): - log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red") - continue - - block_device_struct["partitions"].append({ - "type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject - "start" : start, - "size" : end, - "mountpoint" : None, - "wipe" : True, - "filesystem" : { - "format" : fstype - } - }) - else: - log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.", fg="red") - continue - elif task == suggest_partition_layout: - if len(block_device_struct["partitions"]): - prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device) - choice = Menu(prompt, ['yes', 'no'], default_option='no').run() - - if choice == 'no': - continue - - block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) - elif task is None: - return block_device_struct - else: - current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True) - - if task == delete_partition: - title = _('{}\n\nSelect by index which partitions to delete').format(current_layout) - to_delete = select_partition(title, block_device_struct["partitions"], multiple=True) - - if to_delete: - block_device_struct['partitions'] = [p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete] - elif task == delete_all_partitions: - block_device_struct["partitions"] = [] - elif task == assign_mount_point: - title = _('{}\n\nSelect by index which partition to mount where').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"]) - - if partition is not None: - print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) - mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip() - - if len(mountpoint): - block_device_struct["partitions"][partition]['mountpoint'] = mountpoint - if mountpoint == '/boot': - log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow") - block_device_struct["partitions"][partition]['boot'] = True - else: - del(block_device_struct["partitions"][partition]['mountpoint']) - - elif task == mark_formatted: - title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"]) - - if partition is not None: - # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really - # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, - # it's safe to change the filesystem for this partition. - if block_device_struct["partitions"][partition].get('filesystem', {}).get('format', 'crypto_LUKS') == 'crypto_LUKS': - if not block_device_struct["partitions"][partition].get('filesystem', None): - block_device_struct["partitions"][partition]['filesystem'] = {} - - fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() - - block_device_struct["partitions"][partition]['filesystem']['format'] = fstype - - # Negate the current wipe marking - block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) - - elif task == mark_encrypted: - title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"]) - - if partition is not None: - # Negate the current encryption marking - block_device_struct["partitions"][partition]['encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False) - - elif task == mark_bootable: - title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"]) - - if partition is not None: - block_device_struct["partitions"][partition]['boot'] = not block_device_struct["partitions"][partition].get('boot', False) - - elif task == set_filesystem_partition: - title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"]) - - if partition is not None: - if not block_device_struct["partitions"][partition].get('filesystem', None): - block_device_struct["partitions"][partition]['filesystem'] = {} - - fstype_title = _('Enter a desired filesystem type for the partition: ') - fstype = Menu(fstype_title, fs_types(), skip=False).run() - - block_device_struct["partitions"][partition]['filesystem']['format'] = fstype - - return block_device_struct - - -def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: - result = {} - - for device in block_devices: - layout = manage_new_and_existing_partitions(device) - - result[device.path] = layout - - return result - - -def select_archinstall_language(default='English'): - languages = Translation.get_all_names() - language = Menu(_('Select Archinstall language'), languages, default_option=default).run() - return language - - -def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]: - wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout')) - custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)')) - modes = [wipe_mode, custome_mode] - - print(modes) - mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run() - - if mode == wipe_mode: - return get_default_partition_layout(block_devices, advanced_options) - else: - return select_individual_blockdevice_usage(block_devices) - - -def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice: - """ - Asks the user to select a harddrive from the `dict_o_disks` selection. - Usually this is combined with :ref:`archinstall.list_drives`. - - :param dict_o_disks: A `dict` where keys are the drive-name, value should be a dict containing drive information. - :type dict_o_disks: dict - - :return: The name/path (the dictionary key) of the selected drive - :rtype: str - """ - drives = sorted(list(dict_o_disks.keys())) - if len(drives) >= 1: - for index, drive in enumerate(drives): - print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})") - - log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow") - - drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run() - if not drive: - return drive - - drive = dict_o_disks[drive] - return drive - - raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') - - -def select_profile() -> Optional[Profile]: - """ - # Asks the user to select a profile from the available profiles. - # - # :return: The name/dictionary key of the selected profile - # :rtype: str - # """ - top_level_profiles = sorted(list(list_profiles(filter_top_level_profiles=True))) - options = {} - - for profile in top_level_profiles: - profile = Profile(None, profile) - description = profile.get_profile_description() - - option = f'{profile.profile}: {description}' - options[option] = profile - - title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments') - - selection = Menu(title=title, p_options=list(options.keys())).run() - - if selection is not None: - return options[selection] - - return None - - -def select_language(default_value :str, preset_value :str = None) -> str: - """ - Asks the user to select a language - Usually this is combined with :ref:`archinstall.list_keyboard_languages`. - - :return: The language/dictionary key of the selected language - :rtype: str - """ - kb_lang = list_keyboard_languages() - # sort alphabetically and then by length - # it's fine if the list is big because the Menu - # allows for searching anyways - sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) - - selected_lang = Menu(_('Select Keyboard layout'), sorted_kb_lang, default_option=default_value, preset_values=preset_value, sort=False).run() - return selected_lang - - -def select_mirror_regions(preset_values :Dict[str, Any] = {}) -> Dict[str, Any]: - """ - Asks the user to select a mirror or region - Usually this is combined with :ref:`archinstall.list_mirrors`. - - :return: The dictionary information about a mirror/region. - :rtype: dict - """ - if preset_values is None: - preselected = None - else: - preselected = list(preset_values.keys()) - mirrors = list_mirrors() - selected_mirror = Menu( - _('Select one of the regions to download packages from'), - list(mirrors.keys()), - preset_values=preselected, - multi=True - ).run() - - if selected_mirror is not None: - return {selected: mirrors[selected] for selected in selected_mirror} - - return {} - - -def select_harddrives(preset : List[str] = []) -> List[str]: - """ - Asks the user to select one or multiple hard drives - - :return: List of selected hard drives - :rtype: list - """ - hard_drives = all_blockdevices(partitions=False).values() - options = {f'{option}': option for option in hard_drives} - - if preset: - preset_disks = {f'{option}':option for option in preset} - else: - preset_disks = {} - - selected_harddrive = Menu( - _('Select one or more hard drives to use and configure'), - list(options.keys()), - preset_values=list(preset_disks.keys()), - multi=True - ).run() - - if selected_harddrive and len(selected_harddrive) > 0: - return [options[i] for i in selected_harddrive] - - return [] - - -def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask :bool = False) -> str: - """ - Some what convoluted function, whose job is simple. - Select a graphics driver from a pre-defined set of popular options. - - (The template xorg is for beginner users, not advanced, and should - there for appeal to the general public first and edge cases later) - """ - - drivers = sorted(list(options)) - - if drivers: - arguments = storage.get('arguments', {}) - title = DeferredTranslation('') - - if has_amd_graphics(): - title += _('For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.') + '\n' - if has_intel_graphics(): - title += _('For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n') - if has_nvidia_graphics(): - title += _('For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n') - - if not arguments.get('gfx_driver', None) or force_ask: - title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers') - arguments['gfx_driver'] = Menu(title, drivers).run() - - if arguments.get('gfx_driver', None) is None: - arguments['gfx_driver'] = _("All open-source (default)") - - return options.get(arguments.get('gfx_driver')) - - raise RequirementError("Selecting drivers require a least one profile to be given as an option.") - - -def select_kernel(preset :List[str] = None) -> List[str]: - """ - Asks the user to select a kernel for system. - - :return: The string as a selected kernel - :rtype: string - """ - - kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"] - default_kernel = "linux" - - selected_kernels = Menu( - _('Choose which kernels to use or leave blank for default "{}"').format(default_kernel), - kernels, - sort=True, - multi=True, - preset_values=preset, - default_option=default_kernel - ).run() - return selected_kernels - -def select_additional_repositories(preset :List[str]) -> List[str]: - """ - Allows the user to select additional repositories (multilib, and testing) if desired. - - :return: The string as a selected repository - :rtype: string - """ - - repositories = ["multilib", "testing"] - - additional_repositories = Menu( - _('Choose which optional additional repositories to enable'), - repositories, - sort=False, - multi=True, - preset_values=preset, - default_option=[] - ).run() - - if additional_repositories is not None: - return additional_repositories - - return [] - -def select_locale_lang(default :str,preset :str = None) -> str : - locales = list_locales() - locale_lang = set([locale.split()[0] for locale in locales]) - - selected_locale = Menu( - _('Choose which locale language to use'), - locale_lang, - sort=True, - preset_values=preset, - default_option=default - ).run() - - return selected_locale - - -def select_locale_enc(default :str,preset :str = None) -> str: - locales = list_locales() - locale_enc = set([locale.split()[1] for locale in locales]) - - selected_locale = Menu( - _('Choose which locale encoding to use'), - locale_enc, - sort=True, - preset_values=preset, - default_option=default - ).run() - - return selected_locale - - -def generic_select(p_options :Union[list,dict], - input_text :str = '', - allow_empty_input :bool = True, - options_output :bool = True, # function not available - sort :bool = False, - multi :bool = False, - default :Any = None) -> Any: - """ - A generic select function that does not output anything - other than the options and their indexes. As an example: - - generic_select(["first", "second", "third option"]) - > first - second - third option - When the user has entered the option correctly, - this function returns an item from list, a string, or None - - Options can be any iterable. - Duplicate entries are not checked, but the results with them are unreliable. Which element to choose from the duplicates depends on the return of the index() - Default value if not on the list of options will be added as the first element - sort will be handled by Menu() - """ - # We check that the options are iterable. If not we abort. Else we copy them to lists - # it options is a dictionary we use the values as entries of the list - # if options is a string object, each character becomes an entry - # if options is a list, we implictily build a copy to mantain immutability - if not isinstance(p_options,Iterable): - log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select",fg="red") - log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) - raise RequirementError("generic_select() requires an iterable as option.") - - input_text = input_text if input_text else _('Select one of the values shown below: ') - - if isinstance(p_options,dict): - options = list(p_options.values()) - else: - options = list(p_options) - # check that the default value is in the list. If not it will become the first entry - if default and default not in options: - options.insert(0,default) - - # one of the drawbacks of the new interface is that in only allows string like options, so we do a conversion - # also for the default value if it exists - soptions = list(map(str,options)) - default_value = options[options.index(default)] if default else None - - selected_option = Menu( - input_text, - soptions, - skip=allow_empty_input, - multi=multi, - default_option=default_value, - sort=sort - ).run() - # we return the original objects, not the strings. - # options is the list with the original objects and soptions the list with the string values - # thru the map, we get from the value selected in soptions it index, and thu it the original object - if not selected_option: - return selected_option - elif isinstance(selected_option,list): # for multi True - selected_option = list(map(lambda x: options[soptions.index(x)],selected_option)) - else: # for multi False - selected_option = options[soptions.index(selected_option)] - return selected_option - - -def generic_multi_select(p_options :Union[list,dict], - text :str = '', - sort :bool = False, - default :Any = None, - allow_empty :bool = False) -> Any: - - text = text if text else _("Select one or more of the options below: ") - - return generic_select(p_options, - input_text=text, - allow_empty_input=allow_empty, - sort=sort, - multi=True, - default=default) - - -class UserList(ListManager): - """ - subclass of ListManager for the managing of user accounts - """ - def __init__(self,prompt :str, lusers :dict, sudo :bool = None): - """ - param: prompt - type: str - param: lusers dict with the users already defined for the system - type: Dict - param: sudo. boolean to determine if we handle superusers or users. If None handles both types - """ - self.sudo = sudo - self.actions = [ - str(_('Add an user')), - str(_('Change password')), - str(_('Promote/Demote user')), - str(_('Delete User')) - ] - self.default_action = self.actions[0] - super().__init__(prompt,lusers,self.actions,self.default_action) - - def reformat(self): - def format_element(elem): - # secret gives away the length of the password - if self.data[elem].get('!password'): - pwd = '*' * 16 - # pwd = archinstall.secret(self.data[elem]['!password']) - else: - pwd = '' - if self.data[elem].get('sudoer'): - super = 'Superuser' - else: - super = ' ' - return f"{elem:16}: password {pwd:16} {super}" - return list(map(lambda x:format_element(x),self.data)) - - def action_list(self): - if self.target: - active_user = list(self.target.keys())[0] - else: - active_user = None - sudoer = self.target[active_user].get('sudoer',False) - if self.sudo is None: - return self.actions - if self.sudo and sudoer: - return self.actions - elif self.sudo and not sudoer: - return [self.actions[2]] - elif not self.sudo and sudoer: - return [self.actions[2]] - else: - return self.actions - - def exec_action(self): - if self.target: - active_user = list(self.target.keys())[0] - else: - active_user = None - - if self.action == self.actions[0]: # add - new_user = self.add_user() - # no unicity check, if exists will be replaced - self.data.update(new_user) - elif self.action == self.actions[1]: # change password - self.data[active_user]['!password'] = get_password(prompt=str(_('Password for user "{}": ').format(active_user))) - elif self.action == self.actions[2]: # promote/demote - self.data[active_user]['sudoer'] = not self.data[active_user]['sudoer'] - elif self.action == self.actions[3]: # delete - del self.data[active_user] - - def add_user(self): - print(_('\nDefine a new user\n')) - prompt = str(_("User Name : ")) - while True: - userid = input(prompt).strip(' ') - if not userid: - return {} # end - if not check_for_correct_username(userid): - pass - else: - break - if self.sudo: - sudoer = True - elif self.sudo is not None and not self.sudo: - sudoer = False - else: - sudoer = False - sudo_choice = Menu( - str(_('Should {} be a superuser (sudoer)?')).format(userid), - ['yes', 'no'], - skip=False, - preset_values='yes' if sudoer else 'no', - default_option='no' - ).run() - sudoer = True if sudo_choice == 'yes' else False - - password = get_password(prompt=str(_('Password for user "{}": ').format(userid))) - - return {userid :{"!password":password, "sudoer":sudoer}} - -def manage_users(prompt :str, sudo :bool) -> tuple[dict, dict]: - - # TODO Filtering and some kind of simpler code - lusers = {} - if storage['arguments'].get('!superusers',{}): - lusers.update({uid: {'!password':storage['arguments']['!superusers'][uid].get('!password'), 'sudoer':True} for uid in storage['arguments'].get('!superusers',{})}) - if storage['arguments'].get('!users',{}): - lusers.update({uid: {'!password':storage['arguments']['!users'][uid].get('!password'), 'sudoer':False} for uid in storage['arguments'].get('!users',{})}) - # processing - lusers = UserList(prompt,lusers,sudo).run() - # return data - superusers = {uid: {'!password':lusers[uid].get('!password')} for uid in lusers if lusers[uid].get('sudoer',False)} - users = {uid: {'!password':lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer',False)} - storage['arguments']['!superusers'] = superusers - storage['arguments']['!users'] = users - return superusers,users - -def save_config(config: Dict): - def preview(selection: str): - if options['user_config'] == selection: - json_config = config_output.user_config_to_json() - return f'{config_output.user_configuration_file}\n{json_config}' - elif options['user_creds'] == selection: - if json_config := config_output.user_credentials_to_json(): - return f'{config_output.user_credentials_file}\n{json_config}' - else: - return str(_('No configuration')) - elif options['disk_layout'] == selection: - if json_config := config_output.disk_layout_to_json(): - return f'{config_output.disk_layout_file}\n{json_config}' - else: - return str(_('No configuration')) - elif options['all'] == selection: - output = f'{config_output.user_configuration_file}\n' - if json_config := config_output.user_credentials_to_json(): - output += f'{config_output.user_credentials_file}\n' - if json_config := config_output.disk_layout_to_json(): - output += f'{config_output.disk_layout_file}\n' - return output[:-1] - return None - - config_output = ConfigurationOutput(config) - - options = { - 'user_config': str(_('Save user configuration')), - 'user_creds': str(_('Save user credentials')), - 'disk_layout': str(_('Save disk layout')), - 'all': str(_('Save all')) - } - - selection = Menu( - _('Choose which configuration to save'), - list(options.values()), - sort=False, - skip=True, - preview_size=0.75, - preview_command=preview - ).run() - - if not selection: - return - - while True: - path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ') - dest_path = Path(path) - if dest_path.exists() and dest_path.is_dir(): - break - log(_('Not a valid directory: {}').format(dest_path), fg='red') - - if options['user_config'] == selection: - config_output.save_user_config(dest_path) - elif options['user_creds'] == selection: - config_output.save_user_creds(dest_path) - elif options['disk_layout'] == selection: - config_output.save_disk_layout(dest_path) - elif options['all'] == selection: - config_output.save_user_config(dest_path) - config_output.save_user_creds(dest_path) - config_output.save_disk_layout diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py new file mode 100644 index 00000000..b0174d94 --- /dev/null +++ b/archinstall/lib/user_interaction/__init__.py @@ -0,0 +1,12 @@ +from .save_conf import save_config +from .manage_users_conf import ask_for_superuser_account, ask_for_additional_users +from .backwards_compatible_conf import generic_select, generic_multi_select +from .locale_conf import select_locale_lang, select_locale_enc +from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap +from .network_conf import ask_to_configure_network +from .partitioning_conf import select_partition, select_encrypted_partitions +from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions, + select_profile, select_archinstall_language, ask_additional_packages_to_install, + select_additional_repositories, ask_hostname) +from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk +from .utils import get_password, do_countdown diff --git a/archinstall/lib/user_interaction/backwards_compatible_conf.py b/archinstall/lib/user_interaction/backwards_compatible_conf.py new file mode 100644 index 00000000..d91690eb --- /dev/null +++ b/archinstall/lib/user_interaction/backwards_compatible_conf.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import logging +import sys +from collections.abc import Iterable +from typing import Any, Union, TYPE_CHECKING + +from ..exceptions import RequirementError +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def generic_select( + p_options: Union[list, dict], + input_text: str = '', + allow_empty_input: bool = True, + options_output: bool = True, # function not available + sort: bool = False, + multi: bool = False, + default: Any = None) -> Any: + """ + A generic select function that does not output anything + other than the options and their indexes. As an example: + + generic_select(["first", "second", "third option"]) + > first + second + third option + When the user has entered the option correctly, + this function returns an item from list, a string, or None + + Options can be any iterable. + Duplicate entries are not checked, but the results with them are unreliable. Which element to choose from the duplicates depends on the return of the index() + Default value if not on the list of options will be added as the first element + sort will be handled by Menu() + """ + # We check that the options are iterable. If not we abort. Else we copy them to lists + # it options is a dictionary we use the values as entries of the list + # if options is a string object, each character becomes an entry + # if options is a list, we implictily build a copy to mantain immutability + if not isinstance(p_options, Iterable): + log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select", fg="red") + log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>", level=logging.WARNING) + raise RequirementError("generic_select() requires an iterable as option.") + + input_text = input_text if input_text else _('Select one of the values shown below: ') + + if isinstance(p_options, dict): + options = list(p_options.values()) + else: + options = list(p_options) + # check that the default value is in the list. If not it will become the first entry + if default and default not in options: + options.insert(0, default) + + # one of the drawbacks of the new interface is that in only allows string like options, so we do a conversion + # also for the default value if it exists + soptions = list(map(str, options)) + default_value = options[options.index(default)] if default else None + + selected_option = Menu(input_text, + soptions, + skip=allow_empty_input, + multi=multi, + default_option=default_value, + sort=sort).run() + # we return the original objects, not the strings. + # options is the list with the original objects and soptions the list with the string values + # thru the map, we get from the value selected in soptions it index, and thu it the original object + if not selected_option: + return selected_option + elif isinstance(selected_option, list): # for multi True + selected_option = list(map(lambda x: options[soptions.index(x)], selected_option)) + else: # for multi False + selected_option = options[soptions.index(selected_option)] + return selected_option + + +def generic_multi_select(p_options: Union[list, dict], + text: str = '', + sort: bool = False, + default: Any = None, + allow_empty: bool = False) -> Any: + + text = text if text else _("Select one or more of the options below: ") + + return generic_select(p_options, + input_text=text, + allow_empty_input=allow_empty, + sort=sort, + multi=True, + default=default) diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py new file mode 100644 index 00000000..9238a766 --- /dev/null +++ b/archinstall/lib/user_interaction/disk_conf.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from typing import Any, Dict, TYPE_CHECKING + +from .partitioning_conf import manage_new_and_existing_partitions, get_default_partition_layout +from ..disk import BlockDevice +from ..exceptions import DiskError +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def ask_for_main_filesystem_format(advanced_options=False): + options = {'btrfs': 'btrfs', 'ext4': 'ext4', 'xfs': 'xfs', 'f2fs': 'f2fs'} + + advanced = {'ntfs': 'ntfs'} + + if advanced_options: + options.update(advanced) + + prompt = _('Select which filesystem your main partition should use') + choice = Menu(prompt, options, skip=False).run() + return choice + + +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: + result = {} + + for device in block_devices: + layout = manage_new_and_existing_partitions(device) + + result[device.path] = layout + + return result + + +def select_disk_layout(block_devices: list, advanced_options=False) -> Dict[str, Any]: + wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout')) + custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)')) + modes = [wipe_mode, custome_mode] + + print(modes) + mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run() + + if mode == wipe_mode: + return get_default_partition_layout(block_devices, advanced_options) + else: + return select_individual_blockdevice_usage(block_devices) + + +def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice: + """ + Asks the user to select a harddrive from the `dict_o_disks` selection. + Usually this is combined with :ref:`archinstall.list_drives`. + + :param dict_o_disks: A `dict` where keys are the drive-name, value should be a dict containing drive information. + :type dict_o_disks: dict + + :return: The name/path (the dictionary key) of the selected drive + :rtype: str + """ + drives = sorted(list(dict_o_disks.keys())) + if len(drives) >= 1: + for index, drive in enumerate(drives): + print( + f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})" + ) + + log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", + fg="yellow") + + drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run() + if not drive: + return drive + + drive = dict_o_disks[drive] + return drive + + raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py new file mode 100644 index 00000000..c42e9e27 --- /dev/null +++ b/archinstall/lib/user_interaction/general_conf.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import logging +from typing import List, Any, Optional, Dict, TYPE_CHECKING + +from ..menu.text_input import TextInput + +from ..locale_helpers import list_keyboard_languages, list_timezones +from ..menu import Menu +from ..output import log +from ..profiles import Profile, list_profiles +from ..mirrors import list_mirrors + +from ..translation import Translation +from ..packages.packages import validate_package_list + +if TYPE_CHECKING: + _: Any + + +def ask_ntp(preset: bool = True) -> bool: + prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n')) + prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki')) + if preset: + preset_val = 'yes' + else: + preset_val = 'no' + choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run() + return False if choice == 'no' else True + + +def ask_hostname(preset: str = None) -> str: + hostname = TextInput(_('Desired hostname for the installation: '), preset).run().strip(' ') + return hostname + + +def ask_for_a_timezone(preset: str = None) -> str: + timezones = list_timezones() + default = 'UTC' + + selected_tz = Menu(_('Select a timezone'), + list(timezones), + skip=False, + preset_values=preset, + default_option=default).run() + + return selected_tz + + +def ask_for_audio_selection(desktop: bool = True, preset: str = None) -> str: + audio = 'pipewire' if desktop else 'none' + choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] + selected_audio = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=audio, skip=False).run() + return selected_audio + + +def select_language(default_value: str, preset_value: str = None) -> str: + """ + Asks the user to select a language + Usually this is combined with :ref:`archinstall.list_keyboard_languages`. + + :return: The language/dictionary key of the selected language + :rtype: str + """ + kb_lang = list_keyboard_languages() + # sort alphabetically and then by length + # it's fine if the list is big because the Menu + # allows for searching anyways + sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) + + selected_lang = Menu(_('Select Keyboard layout'), + sorted_kb_lang, + default_option=default_value, + preset_values=preset_value, + sort=False).run() + return selected_lang + + +def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: + """ + Asks the user to select a mirror or region + Usually this is combined with :ref:`archinstall.list_mirrors`. + + :return: The dictionary information about a mirror/region. + :rtype: dict + """ + if preset_values is None: + preselected = None + else: + preselected = list(preset_values.keys()) + mirrors = list_mirrors() + selected_mirror = Menu(_('Select one of the regions to download packages from'), + list(mirrors.keys()), + preset_values=preselected, + multi=True).run() + + if selected_mirror is not None: + return {selected: mirrors[selected] for selected in selected_mirror} + + return {} + + +def select_archinstall_language(default='English'): + languages = Translation.get_all_names() + language = Menu(_('Select Archinstall language'), languages, default_option=default).run() + return language + + +def select_profile() -> Optional[Profile]: + """ + # Asks the user to select a profile from the available profiles. + # + # :return: The name/dictionary key of the selected profile + # :rtype: str + # """ + top_level_profiles = sorted(list(list_profiles(filter_top_level_profiles=True))) + options = {} + + for profile in top_level_profiles: + profile = Profile(None, profile) + description = profile.get_profile_description() + + option = f'{profile.profile}: {description}' + options[option] = profile + + title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments') + + selection = Menu(title=title, p_options=list(options.keys())).run() + + if selection is not None: + return options[selection] + + return None + + +def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]: + # Additional packages (with some light weight error handling for invalid package names) + print(_('Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.')) + print(_('If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.')) + + def read_packages(already_defined: list = []) -> list: + display = ' '.join(already_defined) + input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run() + return input_packages.split(' ') if input_packages else [] + + pre_set_packages = pre_set_packages if pre_set_packages else [] + packages = read_packages(pre_set_packages) + + while True: + if len(packages): + # Verify packages that were given + print(_("Verifying that additional packages exist (this might take a few seconds)")) + valid, invalid = validate_package_list(packages) + + if invalid: + log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') + packages = read_packages(valid) + continue + break + + return packages + + +def select_additional_repositories(preset: List[str]) -> List[str]: + """ + Allows the user to select additional repositories (multilib, and testing) if desired. + + :return: The string as a selected repository + :rtype: string + """ + + repositories = ["multilib", "testing"] + + additional_repositories = Menu(_('Choose which optional additional repositories to enable'), + repositories, + sort=False, + multi=True, + preset_values=preset, + default_option=[]).run() + + if additional_repositories is not None: + return additional_repositories + + return [] diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/user_interaction/locale_conf.py new file mode 100644 index 00000000..d48018cf --- /dev/null +++ b/archinstall/lib/user_interaction/locale_conf.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any, TYPE_CHECKING + +from ..locale_helpers import list_locales +from ..menu import Menu + +if TYPE_CHECKING: + _: Any + + +def select_locale_lang(default: str, preset: str = None) -> str: + locales = list_locales() + locale_lang = set([locale.split()[0] for locale in locales]) + + selected_locale = Menu(_('Choose which locale language to use'), + locale_lang, + sort=True, + preset_values=preset, + default_option=default).run() + + return selected_locale + + +def select_locale_enc(default: str, preset: str = None) -> str: + locales = list_locales() + locale_enc = set([locale.split()[1] for locale in locales]) + + selected_locale = Menu(_('Choose which locale encoding to use'), + locale_enc, + sort=True, + preset_values=preset, + default_option=default).run() + + return selected_locale diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py new file mode 100644 index 00000000..0af0d776 --- /dev/null +++ b/archinstall/lib/user_interaction/manage_users_conf.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import logging +import re +from typing import Any, Dict, TYPE_CHECKING + +from ..menu import Menu +from ..menu.list_manager import ListManager +from ..output import log +from ..storage import storage +from .utils import get_password + +if TYPE_CHECKING: + _: Any + + +class UserList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + + def __init__(self, prompt: str, lusers: dict, sudo: bool = None): + """ + param: prompt + type: str + param: lusers dict with the users already defined for the system + type: Dict + param: sudo. boolean to determine if we handle superusers or users. If None handles both types + """ + self.sudo = sudo + self.actions = [ + str(_('Add an user')), + str(_('Change password')), + str(_('Promote/Demote user')), + str(_('Delete User')) + ] + self.default_action = self.actions[0] + super().__init__(prompt, lusers, self.actions, self.default_action) + + def reformat(self): + + def format_element(elem): + # secret gives away the length of the password + if self.data[elem].get('!password'): + pwd = '*' * 16 + # pwd = archinstall.secret(self.data[elem]['!password']) + else: + pwd = '' + if self.data[elem].get('sudoer'): + super = 'Superuser' + else: + super = ' ' + return f"{elem:16}: password {pwd:16} {super}" + + return list(map(lambda x: format_element(x), self.data)) + + def action_list(self): + if self.target: + active_user = list(self.target.keys())[0] + else: + active_user = None + sudoer = self.target[active_user].get('sudoer', False) + if self.sudo is None: + return self.actions + if self.sudo and sudoer: + return self.actions + elif self.sudo and not sudoer: + return [self.actions[2]] + elif not self.sudo and sudoer: + return [self.actions[2]] + else: + return self.actions + + def exec_action(self): + if self.target: + active_user = list(self.target.keys())[0] + else: + active_user = None + + if self.action == self.actions[0]: # add + new_user = self.add_user() + # no unicity check, if exists will be replaced + self.data.update(new_user) + elif self.action == self.actions[1]: # change password + self.data[active_user]['!password'] = get_password( + prompt=str(_('Password for user "{}": ').format(active_user))) + elif self.action == self.actions[2]: # promote/demote + self.data[active_user]['sudoer'] = not self.data[active_user]['sudoer'] + elif self.action == self.actions[3]: # delete + del self.data[active_user] + + def _check_for_correct_username(username: str) -> bool: + if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: + return True + log("The username you entered is invalid. Try again", level=logging.WARNING, fg='red') + return False + + def add_user(self): + print(_('\nDefine a new user\n')) + prompt = str(_("User Name : ")) + while True: + userid = input(prompt).strip(' ') + if not userid: + return {} # end + if not self._check_for_correct_username(userid): + pass + else: + break + if self.sudo: + sudoer = True + elif self.sudo is not None and not self.sudo: + sudoer = False + else: + sudoer = False + sudo_choice = Menu(str(_('Should {} be a superuser (sudoer)?')).format(userid), ['yes', 'no'], + skip=False, + preset_values='yes' if sudoer else 'no', + default_option='no').run() + sudoer = True if sudo_choice == 'yes' else False + + password = get_password(prompt=str(_('Password for user "{}": ').format(userid))) + + return {userid: {"!password": password, "sudoer": sudoer}} + + +def manage_users(prompt: str, sudo: bool) -> tuple[dict, dict]: + # TODO Filtering and some kind of simpler code + lusers = {} + if storage['arguments'].get('!superusers', {}): + lusers.update({ + uid: { + '!password': storage['arguments']['!superusers'][uid].get('!password'), + 'sudoer': True + } + for uid in storage['arguments'].get('!superusers', {}) + }) + if storage['arguments'].get('!users', {}): + lusers.update({ + uid: { + '!password': storage['arguments']['!users'][uid].get('!password'), + 'sudoer': False + } + for uid in storage['arguments'].get('!users', {}) + }) + # processing + lusers = UserList(prompt, lusers, sudo).run() + # return data + superusers = { + uid: { + '!password': lusers[uid].get('!password') + } + for uid in lusers if lusers[uid].get('sudoer', False) + } + users = {uid: {'!password': lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer', False)} + storage['arguments']['!superusers'] = superusers + storage['arguments']['!users'] = users + return superusers, users + + +def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]: + prompt = prompt if prompt else str(_('Define users with sudo privilege: ')) + superusers, dummy = manage_users(prompt, sudo=True) + return superusers + + +def ask_for_additional_users(prompt: str = '') -> Dict[str, Dict[str, str | None]]: + prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ') + dummy, users = manage_users(prompt, sudo=False) + return users diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py new file mode 100644 index 00000000..f90a2af8 --- /dev/null +++ b/archinstall/lib/user_interaction/network_conf.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import ipaddress +import logging +from copy import copy +from typing import Any, Optional, Dict, TYPE_CHECKING + +from ..menu.text_input import TextInput +from ..models.network_configuration import NetworkConfiguration, NicType + +from ..networking import list_interfaces +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def ask_to_configure_network(preset: Dict[str, Any] = {}) -> Optional[NetworkConfiguration]: + """ + Configure the network on the newly installed system + """ + interfaces = { + 'none': str(_('No network configuration')), + 'iso_config': str(_('Copy ISO network configuration to installation')), + 'network_manager': str(_('Use NetworkManager (necessary to configure internet graphically in GNOME and KDE)')), + **list_interfaces() + } + # for this routine it's easier to set the cursor position rather than a preset value + cursor_idx = None + if preset: + if preset['type'] == 'iso_config': + cursor_idx = 0 + elif preset['type'] == 'network_manager': + cursor_idx = 1 + else: + try: + # let's hope order in dictionaries stay + cursor_idx = list(interfaces.values()).index(preset.get('type')) + except ValueError: + pass + + nic = Menu(_('Select one network interface to configure'), interfaces.values(), cursor_index=cursor_idx, + sort=False).run() + + if not nic: + return None + + if nic == interfaces['none']: + return None + elif nic == interfaces['iso_config']: + return NetworkConfiguration(NicType.ISO) + elif nic == interfaces['network_manager']: + return NetworkConfiguration(NicType.NM) + else: + # Current workaround: + # For selecting modes without entering text within brackets, + # printing out this part separate from options, passed in + # `generic_select` + # we only keep data if it is the same nic as before + if preset.get('type') != nic: + preset_d = {'type': nic, 'dhcp': True, 'ip': None, 'gateway': None, 'dns': []} + else: + preset_d = copy(preset) + + modes = ['DHCP (auto detect)', 'IP (static)'] + default_mode = 'DHCP (auto detect)' + cursor_idx = 0 if preset_d.get('dhcp', True) else 1 + + prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(nic, default_mode) + mode = Menu(prompt, modes, default_option=default_mode, cursor_index=cursor_idx).run() + # TODO preset values for ip and gateway + if mode == 'IP (static)': + while 1: + prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(nic) + ip = TextInput(prompt, preset_d.get('ip')).run().strip() + # Implemented new check for correct IP/subnet input + try: + ipaddress.ip_interface(ip) + break + except ValueError: + log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red') + + # Implemented new check for correct gateway IP address + while 1: + gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '), + preset_d.get('gateway')).run().strip() + try: + if len(gateway) == 0: + gateway = None + else: + ipaddress.ip_address(gateway) + break + except ValueError: + log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red') + + dns = None + if preset_d.get('dns'): + preset_d['dns'] = ' '.join(preset_d['dns']) + else: + preset_d['dns'] = None + dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '), + preset_d['dns']).run().strip() + + if len(dns_input): + dns = dns_input.split(' ') + + return NetworkConfiguration(NicType.MANUAL, iface=nic, ip=ip, gateway=gateway, dns=dns, dhcp=False) + else: + # this will contain network iface names + return NetworkConfiguration(NicType.MANUAL, iface=nic) diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py new file mode 100644 index 00000000..8c5d1375 --- /dev/null +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -0,0 +1,295 @@ +from __future__ import annotations + +from typing import List, Any, Dict, Union, TYPE_CHECKING + +from ..disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position +from ..menu import Menu +from ..output import log + +from ..disk.validators import fs_types + +if TYPE_CHECKING: + from ..disk.partition import Partition + _: Any + + +def partition_overlap(partitions: list, start: str, end: str) -> bool: + # TODO: Implement sanity check + return False + + +def _current_partition_layout(partitions: List[Partition], with_idx: bool = False) -> str: + + def do_padding(name, max_len): + spaces = abs(len(str(name)) - max_len) + 2 + pad_left = int(spaces / 2) + pad_right = spaces - pad_left + return f'{pad_right * " "}{name}{pad_left * " "}|' + + column_names = {} + + # this will add an initial index to the table for each partition + if with_idx: + column_names['index'] = max([len(str(len(partitions))), len('index')]) + + # determine all attribute names and the max length + # of the value among all partitions to know the width + # of the table cells + for p in partitions: + for attribute, value in p.items(): + if attribute in column_names.keys(): + column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)]) + else: + column_names[attribute] = max([len(str(value)), len(attribute)]) + + current_layout = '' + for name, max_len in column_names.items(): + current_layout += do_padding(name, max_len) + + current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n' + + for idx, p in enumerate(partitions): + row = '' + for name, max_len in column_names.items(): + if name == 'index': + row += do_padding(str(idx), max_len) + elif name in p: + row += do_padding(p[name], max_len) + else: + row += ' ' * (max_len + 2) + '|' + + current_layout += f'{row[:-1]}\n' + + title = str(_('Current partition layout')) + return f'\n\n{title}:\n\n{current_layout}' + + +def select_partition(title: str, partitions: List[Partition], multiple: bool = False) -> Union[int, List[int], None]: + partition_indexes = list(map(str, range(len(partitions)))) + partition = Menu(title, partition_indexes, multi=multiple).run() + + if partition is not None: + if isinstance(partition, list): + return [int(p) for p in partition] + else: + return int(partition) + + return None + + +def get_default_partition_layout(block_devices: Union[BlockDevice, List[BlockDevice]], + advanced_options: bool = False) -> Dict[str, Any]: + + if len(block_devices) == 1: + return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options) + else: + return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) + + +def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: + result = {} + + for device in block_devices: + layout = manage_new_and_existing_partitions(device) + + result[device.path] = layout + + return result + + +def manage_new_and_existing_partitions(block_device: BlockDevice) -> Dict[str, Any]: + block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} + # Test code: [part.__dump__() for part in block_device.partitions.values()] + # TODO: Squeeze in BTRFS subvolumes here + + new_partition = str(_('Create a new partition')) + suggest_partition_layout = str(_('Suggest partition layout')) + delete_partition = str(_('Delete a partition')) + delete_all_partitions = str(_('Clear/Delete all partitions')) + assign_mount_point = str(_('Assign mount-point for a partition')) + mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)')) + mark_encrypted = str(_('Mark/Unmark a partition as encrypted')) + mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)')) + set_filesystem_partition = str(_('Set desired filesystem for a partition')) + + while True: + modes = [new_partition, suggest_partition_layout] + + if len(block_device_struct['partitions']): + modes += [ + delete_partition, + delete_all_partitions, + assign_mount_point, + mark_formatted, + mark_encrypted, + mark_bootable, + set_filesystem_partition, + ] + + title = _('Select what to do with\n{}').format(block_device) + + # show current partition layout: + if len(block_device_struct["partitions"]): + title += _current_partition_layout(block_device_struct['partitions']) + '\n' + + task = Menu(title, modes, sort=False).run() + + if not task: + break + + if task == new_partition: + # if partition_type == 'gpt': + # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html + # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html + # name = input("Enter a desired name for the partition: ").strip() + + fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() + + prompt = _('Enter the start sector (percentage or block number, default: {}): ').format( + block_device.first_free_sector) + start = input(prompt).strip() + + if not start.strip(): + start = block_device.first_free_sector + end_suggested = block_device.first_end_sector + else: + end_suggested = '100%' + + prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format( + end_suggested) + end = input(prompt).strip() + + if not end.strip(): + end = end_suggested + + if valid_parted_position(start) and valid_parted_position(end): + if partition_overlap(block_device_struct["partitions"], start, end): + log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", + fg="red") + continue + + block_device_struct["partitions"].append({ + "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject + "start": start, + "size": end, + "mountpoint": None, + "wipe": True, + "filesystem": { + "format": fstype + } + }) + else: + log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.", + fg="red") + continue + elif task == suggest_partition_layout: + if len(block_device_struct["partitions"]): + prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device) + choice = Menu(prompt, ['yes', 'no'], default_option='no').run() + + if choice == 'no': + continue + + block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) + elif task is None: + return block_device_struct + else: + current_layout = _current_partition_layout(block_device_struct['partitions'], with_idx=True) + + if task == delete_partition: + title = _('{}\n\nSelect by index which partitions to delete').format(current_layout) + to_delete = select_partition(title, block_device_struct["partitions"], multiple=True) + + if to_delete: + block_device_struct['partitions'] = [ + p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete + ] + elif task == delete_all_partitions: + block_device_struct["partitions"] = [] + elif task == assign_mount_point: + title = _('{}\n\nSelect by index which partition to mount where').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + print( + _(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + mountpoint = input( + _('Select where to mount partition (leave blank to remove mountpoint): ')).strip() + + if len(mountpoint): + block_device_struct["partitions"][partition]['mountpoint'] = mountpoint + if mountpoint == '/boot': + log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow") + block_device_struct["partitions"][partition]['boot'] = True + else: + del (block_device_struct["partitions"][partition]['mountpoint']) + + elif task == mark_formatted: + title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if block_device_struct["partitions"][partition].get('filesystem',{}).get('format', 'crypto_LUKS') == 'crypto_LUKS': + if not block_device_struct["partitions"][partition].get('filesystem', None): + block_device_struct["partitions"][partition]['filesystem'] = {} + + fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), + skip=False).run() + + block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + + # Negate the current wipe marking + block_device_struct["partitions"][partition][ + 'wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) + + elif task == mark_encrypted: + title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + # Negate the current encryption marking + block_device_struct["partitions"][partition][ + 'encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False) + + elif task == mark_bootable: + title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + block_device_struct["partitions"][partition][ + 'boot'] = not block_device_struct["partitions"][partition].get('boot', False) + + elif task == set_filesystem_partition: + title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout) + partition = select_partition(title, block_device_struct["partitions"]) + + if partition is not None: + if not block_device_struct["partitions"][partition].get('filesystem', None): + block_device_struct["partitions"][partition]['filesystem'] = {} + + fstype_title = _('Enter a desired filesystem type for the partition: ') + fstype = Menu(fstype_title, fs_types(), skip=False).run() + + block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + + return block_device_struct + + +def select_encrypted_partitions(block_devices: dict, password: str) -> dict: + for device in block_devices: + for partition in block_devices[device]['partitions']: + if partition.get('mountpoint', None) != '/boot': + partition['encrypted'] = True + partition['!password'] = password + + if partition['mountpoint'] != '/': + # Tell the upcoming steps to generate a key-file for non root mounts. + partition['generate-encryption-key-file'] = True + + return block_devices + + # TODO: Next version perhaps we can support mixed multiple encrypted partitions + # Users might want to single out a partition for non-encryption to share between dualboot etc. diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py new file mode 100644 index 00000000..c52b97e2 --- /dev/null +++ b/archinstall/lib/user_interaction/save_conf.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, TYPE_CHECKING + +from ..configuration import ConfigurationOutput +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + + +def save_config(config: Dict): + + def preview(selection: str): + if options['user_config'] == selection: + json_config = config_output.user_config_to_json() + return f'{config_output.user_configuration_file}\n{json_config}' + elif options['user_creds'] == selection: + if json_config := config_output.user_credentials_to_json(): + return f'{config_output.user_credentials_file}\n{json_config}' + else: + return str(_('No configuration')) + elif options['disk_layout'] == selection: + if json_config := config_output.disk_layout_to_json(): + return f'{config_output.disk_layout_file}\n{json_config}' + else: + return str(_('No configuration')) + elif options['all'] == selection: + output = f'{config_output.user_configuration_file}\n' + if json_config := config_output.user_credentials_to_json(): + output += f'{config_output.user_credentials_file}\n' + if json_config := config_output.disk_layout_to_json(): + output += f'{config_output.disk_layout_file}\n' + return output[:-1] + return None + + config_output = ConfigurationOutput(config) + + options = { + 'user_config': str(_('Save user configuration')), + 'user_creds': str(_('Save user credentials')), + 'disk_layout': str(_('Save disk layout')), + 'all': str(_('Save all')) + } + + selection = Menu(_('Choose which configuration to save'), + list(options.values()), + sort=False, + skip=True, + preview_size=0.75, + preview_command=preview).run() + + if not selection: + return + + while True: + path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ') + dest_path = Path(path) + if dest_path.exists() and dest_path.is_dir(): + break + log(_('Not a valid directory: {}').format(dest_path), fg='red') + + if options['user_config'] == selection: + config_output.save_user_config(dest_path) + elif options['user_creds'] == selection: + config_output.save_user_creds(dest_path) + elif options['disk_layout'] == selection: + config_output.save_disk_layout(dest_path) + elif options['all'] == selection: + config_output.save_user_config(dest_path) + config_output.save_user_creds(dest_path) + config_output.save_disk_layout diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py new file mode 100644 index 00000000..0120fd8a --- /dev/null +++ b/archinstall/lib/user_interaction/system_conf.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +from typing import List, Any, Dict, TYPE_CHECKING + +from ..disk import all_blockdevices +from ..exceptions import RequirementError +from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics +from ..menu import Menu +from ..storage import storage + +from ..translation import DeferredTranslation + +if TYPE_CHECKING: + _: Any + + +def select_kernel(preset: List[str] = None) -> List[str]: + """ + Asks the user to select a kernel for system. + + :return: The string as a selected kernel + :rtype: string + """ + + kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"] + default_kernel = "linux" + + selected_kernels = Menu(_('Choose which kernels to use or leave blank for default "{}"').format(default_kernel), + kernels, + sort=True, + multi=True, + preset_values=preset, + default_option=default_kernel).run() + return selected_kernels + + +def select_harddrives(preset: List[str] = []) -> List[str]: + """ + Asks the user to select one or multiple hard drives + + :return: List of selected hard drives + :rtype: list + """ + hard_drives = all_blockdevices(partitions=False).values() + options = {f'{option}': option for option in hard_drives} + + if preset: + preset_disks = {f'{option}': option for option in preset} + else: + preset_disks = {} + + selected_harddrive = Menu(_('Select one or more hard drives to use and configure'), + list(options.keys()), + preset_values=list(preset_disks.keys()), + multi=True).run() + + if selected_harddrive and len(selected_harddrive) > 0: + return [options[i] for i in selected_harddrive] + + return [] + + +def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bool = False) -> str: + """ + Some what convoluted function, whose job is simple. + Select a graphics driver from a pre-defined set of popular options. + + (The template xorg is for beginner users, not advanced, and should + there for appeal to the general public first and edge cases later) + """ + + drivers = sorted(list(options)) + + if drivers: + arguments = storage.get('arguments', {}) + title = DeferredTranslation('') + + if has_amd_graphics(): + title += _( + 'For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.' + ) + '\n' + if has_intel_graphics(): + title += _( + 'For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n' + ) + if has_nvidia_graphics(): + title += _( + 'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n' + ) + + if not arguments.get('gfx_driver', None) or force_ask: + title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers') + arguments['gfx_driver'] = Menu(title, drivers).run() + + if arguments.get('gfx_driver', None) is None: + arguments['gfx_driver'] = _("All open-source (default)") + + return options.get(arguments.get('gfx_driver')) + + raise RequirementError("Selecting drivers require a least one profile to be given as an option.") + + +def ask_for_bootloader(advanced_options: bool = False, preset: str = None) -> str: + + if preset == 'systemd-bootctl': + preset_val = 'systemd-boot' if advanced_options else 'no' + elif preset == 'grub-install': + preset_val = 'grub' if advanced_options else 'yes' + else: + preset_val = preset + + bootloader = "systemd-bootctl" if has_uefi() else "grub-install" + if has_uefi(): + if not advanced_options: + bootloader_choice = Menu(_('Would you like to use GRUB as a bootloader instead of systemd-boot?'), + ['yes', 'no'], + preset_values=preset_val, + default_option='no').run() + + if bootloader_choice == "yes": + bootloader = "grub-install" + else: + # We use the common names for the bootloader as the selection, and map it back to the expected values. + choices = ['systemd-boot', 'grub', 'efistub'] + selection = Menu(_('Choose a bootloader'), choices, preset_values=preset_val).run() + if selection != "": + if selection == 'systemd-boot': + bootloader = 'systemd-bootctl' + elif selection == 'grub': + bootloader = 'grub-install' + else: + bootloader = selection + + return bootloader + + +def ask_for_swap(preset: bool = True) -> bool: + if preset: + preset_val = 'yes' + else: + preset_val = 'no' + prompt = _('Would you like to use swap on zram?') + choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run() + return False if choice == 'no' else True diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py new file mode 100644 index 00000000..48b55e8c --- /dev/null +++ b/archinstall/lib/user_interaction/utils.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import getpass +import signal +import sys +import time +from typing import Any, Optional, TYPE_CHECKING + +from ..menu import Menu +from ..output import log + +if TYPE_CHECKING: + _: Any + +# used for signal handler +SIG_TRIGGER = None + + +def check_password_strong(passwd: str) -> bool: + symbol_count = 0 + if any(character.isdigit() for character in passwd): + symbol_count += 10 + if any(character.isupper() for character in passwd): + symbol_count += 26 + if any(character.islower() for character in passwd): + symbol_count += 26 + if any(not character.isalnum() for character in passwd): + symbol_count += 40 + + if symbol_count**len(passwd) < 10e20: + + prompt = _("The password you are using seems to be weak,") + prompt += _("are you sure you want to use it?") + + choice = Menu(prompt, ["yes", "no"], default_option="yes").run() + return choice == "yes" + + return True + + +def get_password(prompt: str = '') -> Optional[str]: + if not prompt: + prompt = _("Enter a password: ") + + while passwd := getpass.getpass(prompt): + + if len(passwd.strip()) <= 0: + break + + if not check_password_strong(passwd): + continue + + passwd_verification = getpass.getpass(prompt=_('And one more time for verification: ')) + if passwd != passwd_verification: + log(' * Passwords did not match * ', fg='red') + continue + + return passwd + return None + + +def do_countdown() -> bool: + SIG_TRIGGER = False + + def kill_handler(sig: int, frame: Any) -> None: + print() + exit(0) + + def sig_handler(sig: int, frame: Any) -> None: + global SIG_TRIGGER + SIG_TRIGGER = True + signal.signal(signal.SIGINT, kill_handler) + + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, sig_handler) + + for i in range(5, 0, -1): + print(f"{i}", end='') + + for x in range(4): + sys.stdout.flush() + time.sleep(0.25) + print(".", end='') + + if SIG_TRIGGER: + prompt = _('Do you really want to abort?') + choice = Menu(prompt, ['yes', 'no'], skip=False).run() + if choice == 'yes': + exit(0) + + if SIG_TRIGGER is False: + sys.stdin.read() + SIG_TRIGGER = False + signal.signal(signal.SIGINT, sig_handler) + + print() + signal.signal(signal.SIGINT, original_sigint_handler) + + return True -- cgit v1.2.3-70-g09d2