Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/user_interaction.py
diff options
context:
space:
mode:
authorDaniel <blackrabbit256@gmail.com>2022-03-28 22:49:05 +1100
committerGitHub <noreply@github.com>2022-03-28 13:49:05 +0200
commit3dc0d957e838c34b48a0782d2540341e33b24070 (patch)
treec0d4edbc2a9c0b6c05161421c3ad28cec9ab2dc7 /archinstall/lib/user_interaction.py
parente85c9b65315498ab4701ea31c079d198eba8d9ac (diff)
Deflate user interactions (#1019)
* Deflate the user interactions file * Fix flake8 Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Diffstat (limited to 'archinstall/lib/user_interaction.py')
-rw-r--r--archinstall/lib/user_interaction.py1345
1 files changed, 0 insertions, 1345 deletions
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
deleted file mode 100644
index c6f3eef7..00000000
--- a/archinstall/lib/user_interaction.py
+++ /dev/null
@@ -1,1345 +0,0 @@
-from __future__ import annotations
-import getpass
-import ipaddress
-import logging
-import re
-import select # Used for char by char polling of sys.stdin
-import shutil
-import signal
-import sys
-import time
-from collections.abc import Iterable
-from pathlib import Path
-from copy import copy
-from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING
-
-# https://stackoverflow.com/a/39757388/929999
-from .menu.text_input import TextInput
-from .configuration import ConfigurationOutput
-from .models.network_configuration import NetworkConfiguration, NicType
-
-if TYPE_CHECKING:
- from .disk.partition import Partition
- _: Any
-
-from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_blockdevices
-from .exceptions import RequirementError, DiskError
-from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
-from .locale_helpers import list_keyboard_languages, list_timezones, list_locales
-from .networking import list_interfaces
-from .menu import Menu
-from .menu.list_manager import ListManager
-from .output import log
-from .profiles import Profile, list_profiles
-from .storage import storage
-from .mirrors import list_mirrors
-
-# TODO: Some inconsistencies between the selection processes.
-# Some return the keys from the options, some the values?
-from .translation import Translation, DeferredTranslation
-from .disk.validators import fs_types
-from .packages.packages import validate_package_list
-
-
-# used for signal handler
-SIG_TRIGGER = None
-
-
-# TODO: These can be removed after the move to simple_menu.py
-def get_terminal_height() -> int:
- return shutil.get_terminal_size().lines
-
-
-def get_terminal_width() -> int:
- return shutil.get_terminal_size().columns
-
-
-def get_longest_option(options :List[Any]) -> int:
- return max([len(x) for x in options])
-
-
-def check_for_correct_username(username :str) -> bool:
- if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
- return True
- log(
- "The username you entered is invalid. Try again",
- level=logging.WARNING,
- fg='red'
- )
- return False
-
-
-def do_countdown() -> bool:
- SIG_TRIGGER = False
-
- def kill_handler(sig :int, frame :Any) -> None:
- print()
- exit(0)
-
- def sig_handler(sig :int, frame :Any) -> None:
- global SIG_TRIGGER
- SIG_TRIGGER = True
- signal.signal(signal.SIGINT, kill_handler)
-
- original_sigint_handler = signal.getsignal(signal.SIGINT)
- signal.signal(signal.SIGINT, sig_handler)
-
- for i in range(5, 0, -1):
- print(f"{i}", end='')
-
- for x in range(4):
- sys.stdout.flush()
- time.sleep(0.25)
- print(".", end='')
-
- if SIG_TRIGGER:
- prompt = _('Do you really want to abort?')
- choice = Menu(prompt, ['yes', 'no'], skip=False).run()
- if choice == 'yes':
- exit(0)
-
- if SIG_TRIGGER is False:
- sys.stdin.read()
- SIG_TRIGGER = False
- signal.signal(signal.SIGINT, sig_handler)
-
- print()
- signal.signal(signal.SIGINT, original_sigint_handler)
-
- return True
-
-def check_password_strong(passwd :str) -> bool:
-
- symbol_count = 0
- if any(character.isdigit() for character in passwd):
- symbol_count += 10
- if any(character.isupper() for character in passwd):
- symbol_count += 26
- if any(character.islower() for character in passwd):
- symbol_count += 26
- if any(not character.isalnum() for character in passwd):
- symbol_count += 40
-
- if symbol_count ** len(passwd) < 10e20:
-
- prompt = _("The password you are using seems to be weak,")
- prompt += _("are you sure you want to use it?")
-
- choice = Menu(prompt, ["yes", "no"], default_option="yes").run()
- return choice == "yes"
-
- return True
-
-
-def get_password(prompt :str = '') -> Optional[str]:
- if not prompt:
- prompt = _("Enter a password: ")
-
- while passwd := getpass.getpass(prompt):
-
- if len(passwd.strip()) <= 0:
- break
-
- if not check_password_strong(passwd):
- continue
-
- passwd_verification = getpass.getpass(prompt=_('And one more time for verification: '))
- if passwd != passwd_verification:
- log(' * Passwords did not match * ', fg='red')
- continue
-
- return passwd
- return None
-
-
-def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]:
- highest_index_number_length = len(str(len(options)))
- longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
- spaces_without_option = longest_line - (len(separator) + highest_index_number_length)
- max_num_of_columns = get_terminal_width() // longest_line
- max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom)
-
- if len(options) > max_options_in_cells:
- for index, option in enumerate(options):
- print(f"{index}: {option}")
- return 1, index
- else:
- for row in range(0, (get_terminal_height() - margin_bottom)):
- for column in range(row, len(options), (get_terminal_height() - margin_bottom)):
- spaces = " " * (spaces_without_option - len(options[column]))
- print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end=spaces)
- print()
-
- return column, row
-
-
-def select_encrypted_partitions(block_devices :dict, password :str) -> dict:
- for device in block_devices:
- for partition in block_devices[device]['partitions']:
- if partition.get('mountpoint', None) != '/boot':
- partition['encrypted'] = True
- partition['!password'] = password
-
- if partition['mountpoint'] != '/':
- # Tell the upcoming steps to generate a key-file for non root mounts.
- partition['generate-encryption-key-file'] = True
-
- return block_devices
-
- # TODO: Next version perhaps we can support mixed multiple encrypted partitions
- # Users might want to single out a partition for non-encryption to share between dualboot etc.
-
-
-# TODO: This can be removed once we have simple_menu everywhere
-class MiniCurses:
- def __init__(self, width, height):
- self.width = width
- self.height = height
-
- self._cursor_y = 0
- self._cursor_x = 0
-
- self.input_pos = 0
-
- def write_line(self, text, clear_line=True):
- if clear_line:
- sys.stdout.flush()
- sys.stdout.write("\033[%dG" % 0)
- sys.stdout.flush()
- sys.stdout.write(" " * (get_terminal_width() - 1))
- sys.stdout.flush()
- sys.stdout.write("\033[%dG" % 0)
- sys.stdout.flush()
- sys.stdout.write(text)
- sys.stdout.flush()
- self._cursor_x += len(text)
-
- def clear(self, x, y):
- if x < 0:
- x = 0
- if y < 0:
- y = 0
-
- # import time
- # sys.stdout.write(f"Clearing from: {x, y}")
- # sys.stdout.flush()
- # time.sleep(2)
-
- sys.stdout.flush()
- sys.stdout.write('\033[%d;%df' % (y, x))
- for line in range(get_terminal_height() - y - 1, y):
- sys.stdout.write(" " * (get_terminal_width() - 1))
- sys.stdout.flush()
- sys.stdout.write('\033[%d;%df' % (y, x))
- sys.stdout.flush()
-
- def deal_with_control_characters(self, char):
- mapper = {
- '\x7f': 'BACKSPACE',
- '\r': 'CR',
- '\n': 'NL'
- }
-
- if (mapped_char := mapper.get(char, None)) == 'BACKSPACE':
- if self._cursor_x <= self.input_pos:
- # Don't backspace further back than the cursor start position during input
- return True
- # Move back to the current known position (BACKSPACE doesn't updated x-pos)
- sys.stdout.flush()
- sys.stdout.write("\033[%dG" % self._cursor_x)
- sys.stdout.flush()
-
- # Write a blank space
- sys.stdout.flush()
- sys.stdout.write(" ")
- sys.stdout.flush()
-
- # And move back again
- sys.stdout.flush()
- sys.stdout.write("\033[%dG" % self._cursor_x)
- sys.stdout.flush()
-
- self._cursor_x -= 1
-
- return True
- elif mapped_char in ('CR', 'NL'):
- return True
-
- return None
-
- def get_keyboard_input(self, strip_rowbreaks=True, end='\n'):
- assert end in ['\r', '\n', None]
- import termios
- import tty
-
- poller = select.epoll()
- response = ''
-
- sys_fileno = sys.stdin.fileno()
- old_settings = termios.tcgetattr(sys_fileno)
- tty.setraw(sys_fileno)
-
- poller.register(sys.stdin.fileno(), select.EPOLLIN)
-
- eof = False
- while eof is False:
- for fileno, event in poller.poll(0.025):
- char = sys.stdin.read(1)
-
- # sys.stdout.write(f"{[char]}")
- # sys.stdout.flush()
-
- if newline := (char in ('\n', '\r')):
- eof = True
-
- if not newline or strip_rowbreaks is False:
- response += char
-
- if self.deal_with_control_characters(char) is not True:
- self.write_line(response[-1], clear_line=False)
-
- termios.tcsetattr(sys_fileno, termios.TCSADRAIN, old_settings)
-
- if end:
- sys.stdout.write(end)
- sys.stdout.flush()
- self._cursor_x = 0
- self._cursor_y += 1
-
- if response:
- return response
-
-
-def ask_for_swap(preset :bool = True) -> bool:
- if preset:
- preset_val = 'yes'
- else:
- preset_val = 'no'
- prompt = _('Would you like to use swap on zram?')
- choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run()
- return False if choice == 'no' else True
-
-
-def ask_ntp(preset :bool = True) -> bool:
- prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n'))
- prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki'))
- if preset:
- preset_val = 'yes'
- else:
- preset_val = 'no'
- choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run()
- return False if choice == 'no' else True
-
-
-def ask_hostname(preset :str = None) -> str :
- hostname = TextInput(_('Desired hostname for the installation: '),preset).run().strip(' ')
- return hostname
-
-
-def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]:
- prompt = prompt if prompt else str(_('Define users with sudo privilege: '))
- superusers,dummy = manage_users(prompt,sudo=True)
- return superusers
-
-
-def ask_for_additional_users(prompt :str = '') -> Dict[str, Dict[str, str | None]]:
- prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ')
- dummy,users = manage_users(prompt,sudo=False)
- return users
-
-
-def ask_for_a_timezone(preset :str = None) -> str:
- timezones = list_timezones()
- default = 'UTC'
-
- selected_tz = Menu(
- _('Select a timezone'),
- list(timezones),
- skip=False,
- preset_values=preset,
- default_option=default
- ).run()
-
- return selected_tz
-
-def ask_for_bootloader(advanced_options :bool = False, preset :str = None) -> str:
-
- if preset == 'systemd-bootctl':
- preset_val = 'systemd-boot' if advanced_options else 'no'
- elif preset == 'grub-install':
- preset_val = 'grub' if advanced_options else 'yes'
- else:
- preset_val = preset
-
- bootloader = "systemd-bootctl" if has_uefi() else "grub-install"
- if has_uefi():
- if not advanced_options:
- bootloader_choice = Menu(
- _('Would you like to use GRUB as a bootloader instead of systemd-boot?'),
- ['yes', 'no'],
- preset_values=preset_val,
- default_option='no'
- ).run()
-
- if bootloader_choice == "yes":
- bootloader = "grub-install"
- else:
- # We use the common names for the bootloader as the selection, and map it back to the expected values.
- choices = ['systemd-boot', 'grub', 'efistub']
- selection = Menu(_('Choose a bootloader'), choices,preset_values=preset_val).run()
- if selection != "":
- if selection == 'systemd-boot':
- bootloader = 'systemd-bootctl'
- elif selection == 'grub':
- bootloader = 'grub-install'
- else:
- bootloader = selection
-
- return bootloader
-
-
-def ask_for_audio_selection(desktop :bool = True, preset :str = None) -> str:
- audio = 'pipewire' if desktop else 'none'
- choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none']
- selected_audio = Menu(
- _('Choose an audio server'),
- choices,
- preset_values=preset,
- default_option=audio,
- skip=False
- ).run()
- return selected_audio
-
-
-def ask_additional_packages_to_install(pre_set_packages :List[str] = []) -> List[str]:
- # Additional packages (with some light weight error handling for invalid package names)
- print(_('Only packages such as base, base-devel, linux, linux-firmware, efibootmgr and optional profile packages are installed.'))
- print(_('If you desire a web browser, such as firefox or chromium, you may specify it in the following prompt.'))
-
- def read_packages(already_defined: list = []) -> list:
- display = ' '.join(already_defined)
- input_packages = TextInput(
- _('Write additional packages to install (space separated, leave blank to skip): '),
- display
- ).run()
- return input_packages.split(' ') if input_packages else []
-
- pre_set_packages = pre_set_packages if pre_set_packages else []
- packages = read_packages(pre_set_packages)
-
- while True:
- if len(packages):
- # Verify packages that were given
- print(_("Verifying that additional packages exist (this might take a few seconds)"))
- valid, invalid = validate_package_list(packages)
-
- if invalid:
- log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
- packages = read_packages(valid)
- continue
- break
-
- return packages
-
-
-def ask_to_configure_network(preset :Dict[str, Any] = {}) -> Optional[NetworkConfiguration]:
- """
- Configure the network on the newly installed system
- """
- interfaces = {
- 'none': str(_('No network configuration')),
- 'iso_config': str(_('Copy ISO network configuration to installation')),
- 'network_manager': str(_('Use NetworkManager (necessary to configure internet graphically in GNOME and KDE)')),
- **list_interfaces()
- }
- # for this routine it's easier to set the cursor position rather than a preset value
- cursor_idx = None
- if preset:
- if preset['type'] == 'iso_config':
- cursor_idx = 0
- elif preset['type'] == 'network_manager':
- cursor_idx = 1
- else:
- try :
- # let's hope order in dictionaries stay
- cursor_idx = list(interfaces.values()).index(preset.get('type'))
- except ValueError:
- pass
-
- nic = Menu(_('Select one network interface to configure'), interfaces.values(), cursor_index=cursor_idx, sort=False).run()
-
- if not nic:
- return None
-
- if nic == interfaces['none']:
- return None
- elif nic == interfaces['iso_config']:
- return NetworkConfiguration(NicType.ISO)
- elif nic == interfaces['network_manager']:
- return NetworkConfiguration(NicType.NM)
- else:
- # Current workaround:
- # For selecting modes without entering text within brackets,
- # printing out this part separate from options, passed in
- # `generic_select`
- # we only keep data if it is the same nic as before
- if preset.get('type') != nic:
- preset_d = {'type': nic, 'dhcp': True, 'ip': None, 'gateway': None, 'dns': []}
- else:
- preset_d = copy(preset)
-
- modes = ['DHCP (auto detect)', 'IP (static)']
- default_mode = 'DHCP (auto detect)'
- cursor_idx = 0 if preset_d.get('dhcp',True) else 1
-
- prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(nic, default_mode)
- mode = Menu(prompt, modes, default_option=default_mode, cursor_index=cursor_idx).run()
- # TODO preset values for ip and gateway
- if mode == 'IP (static)':
- while 1:
- prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(nic)
- ip = TextInput(prompt,preset_d.get('ip')).run().strip()
- # Implemented new check for correct IP/subnet input
- try:
- ipaddress.ip_interface(ip)
- break
- except ValueError:
- log(
- "You need to enter a valid IP in IP-config mode.",
- level=logging.WARNING,
- fg='red'
- )
-
- # Implemented new check for correct gateway IP address
- while 1:
- gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),preset_d.get('gateway')).run().strip()
- try:
- if len(gateway) == 0:
- gateway = None
- else:
- ipaddress.ip_address(gateway)
- break
- except ValueError:
- log(
- "You need to enter a valid gateway (router) IP address.",
- level=logging.WARNING,
- fg='red'
- )
-
- dns = None
- if preset_d.get('dns'):
- preset_d['dns'] = ' '.join(preset_d['dns'])
- else:
- preset_d['dns'] = None
- dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '),preset_d['dns']).run().strip()
-
- if len(dns_input):
- dns = dns_input.split(' ')
-
- return NetworkConfiguration(
- NicType.MANUAL,
- iface=nic,
- ip=ip,
- gateway=gateway,
- dns=dns,
- dhcp=False
- )
- else:
- # this will contain network iface names
- return NetworkConfiguration(NicType.MANUAL, iface=nic)
-
-
-def partition_overlap(partitions :list, start :str, end :str) -> bool:
- # TODO: Implement sanity check
- return False
-
-
-def ask_for_main_filesystem_format(advanced_options=False):
- options = {
- 'btrfs': 'btrfs',
- 'ext4': 'ext4',
- 'xfs': 'xfs',
- 'f2fs': 'f2fs'
- }
-
- advanced = {
- 'ntfs': 'ntfs'
- }
-
- if advanced_options:
- options.update(advanced)
-
- prompt = _('Select which filesystem your main partition should use')
- choice = Menu(prompt, options, skip=False).run()
- return choice
-
-
-def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> str:
- def do_padding(name, max_len):
- spaces = abs(len(str(name)) - max_len) + 2
- pad_left = int(spaces / 2)
- pad_right = spaces - pad_left
- return f'{pad_right * " "}{name}{pad_left * " "}|'
-
- column_names = {}
-
- # this will add an initial index to the table for each partition
- if with_idx:
- column_names['index'] = max([len(str(len(partitions))), len('index')])
-
- # determine all attribute names and the max length
- # of the value among all partitions to know the width
- # of the table cells
- for p in partitions:
- for attribute, value in p.items():
- if attribute in column_names.keys():
- column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
- else:
- column_names[attribute] = max([len(str(value)), len(attribute)])
-
- current_layout = ''
- for name, max_len in column_names.items():
- current_layout += do_padding(name, max_len)
-
- current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
-
- for idx, p in enumerate(partitions):
- row = ''
- for name, max_len in column_names.items():
- if name == 'index':
- row += do_padding(str(idx), max_len)
- elif name in p:
- row += do_padding(p[name], max_len)
- else:
- row += ' ' * (max_len + 2) + '|'
-
- current_layout += f'{row[:-1]}\n'
-
- title = str(_('Current partition layout'))
- return f'\n\n{title}:\n\n{current_layout}'
-
-
-def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]:
- partition_indexes = list(map(str, range(len(partitions))))
- partition = Menu(title, partition_indexes, multi=multiple).run()
-
- if partition is not None:
- if isinstance(partition, list):
- return [int(p) for p in partition]
- else:
- return int(partition)
-
- return None
-
-def get_default_partition_layout(
- block_devices :Union[BlockDevice, List[BlockDevice]],
- advanced_options :bool = False
-) -> Dict[str, Any]:
-
- if len(block_devices) == 1:
- return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options)
- else:
- return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
-
-
-def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]:
- block_device_struct = {
- "partitions": [partition.__dump__() for partition in block_device.partitions.values()]
- }
- # Test code: [part.__dump__() for part in block_device.partitions.values()]
- # TODO: Squeeze in BTRFS subvolumes here
-
- new_partition = str(_('Create a new partition'))
- suggest_partition_layout = str(_('Suggest partition layout'))
- delete_partition = str(_('Delete a partition'))
- delete_all_partitions = str(_('Clear/Delete all partitions'))
- assign_mount_point = str(_('Assign mount-point for a partition'))
- mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)'))
- mark_encrypted = str(_('Mark/Unmark a partition as encrypted'))
- mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)'))
- set_filesystem_partition = str(_('Set desired filesystem for a partition'))
-
- while True:
- modes = [new_partition, suggest_partition_layout]
-
- if len(block_device_struct['partitions']):
- modes += [
- delete_partition,
- delete_all_partitions,
- assign_mount_point,
- mark_formatted,
- mark_encrypted,
- mark_bootable,
- set_filesystem_partition,
- ]
-
- title = _('Select what to do with\n{}').format(block_device)
-
- # show current partition layout:
- if len(block_device_struct["partitions"]):
- title += current_partition_layout(block_device_struct['partitions']) + '\n'
-
- task = Menu(title, modes, sort=False).run()
-
- if not task:
- break
-
- if task == new_partition:
- # if partition_type == 'gpt':
- # # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
- # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
- # name = input("Enter a desired name for the partition: ").strip()
-
- fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run()
-
- prompt = _('Enter the start sector (percentage or block number, default: {}): ').format(block_device.first_free_sector)
- start = input(prompt).strip()
-
- if not start.strip():
- start = block_device.first_free_sector
- end_suggested = block_device.first_end_sector
- else:
- end_suggested = '100%'
-
- prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format(end_suggested)
- end = input(prompt).strip()
-
- if not end.strip():
- end = end_suggested
-
- if valid_parted_position(start) and valid_parted_position(end):
- if partition_overlap(block_device_struct["partitions"], start, end):
- log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.", fg="red")
- continue
-
- block_device_struct["partitions"].append({
- "type" : "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
- "start" : start,
- "size" : end,
- "mountpoint" : None,
- "wipe" : True,
- "filesystem" : {
- "format" : fstype
- }
- })
- else:
- log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.", fg="red")
- continue
- elif task == suggest_partition_layout:
- if len(block_device_struct["partitions"]):
- prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device)
- choice = Menu(prompt, ['yes', 'no'], default_option='no').run()
-
- if choice == 'no':
- continue
-
- block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path])
- elif task is None:
- return block_device_struct
- else:
- current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True)
-
- if task == delete_partition:
- title = _('{}\n\nSelect by index which partitions to delete').format(current_layout)
- to_delete = select_partition(title, block_device_struct["partitions"], multiple=True)
-
- if to_delete:
- block_device_struct['partitions'] = [p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete]
- elif task == delete_all_partitions:
- block_device_struct["partitions"] = []
- elif task == assign_mount_point:
- title = _('{}\n\nSelect by index which partition to mount where').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"])
-
- if partition is not None:
- print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
- mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
-
- if len(mountpoint):
- block_device_struct["partitions"][partition]['mountpoint'] = mountpoint
- if mountpoint == '/boot':
- log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
- block_device_struct["partitions"][partition]['boot'] = True
- else:
- del(block_device_struct["partitions"][partition]['mountpoint'])
-
- elif task == mark_formatted:
- title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"])
-
- if partition is not None:
- # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
- # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
- # it's safe to change the filesystem for this partition.
- if block_device_struct["partitions"][partition].get('filesystem', {}).get('format', 'crypto_LUKS') == 'crypto_LUKS':
- if not block_device_struct["partitions"][partition].get('filesystem', None):
- block_device_struct["partitions"][partition]['filesystem'] = {}
-
- fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run()
-
- block_device_struct["partitions"][partition]['filesystem']['format'] = fstype
-
- # Negate the current wipe marking
- block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False)
-
- elif task == mark_encrypted:
- title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"])
-
- if partition is not None:
- # Negate the current encryption marking
- block_device_struct["partitions"][partition]['encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False)
-
- elif task == mark_bootable:
- title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"])
-
- if partition is not None:
- block_device_struct["partitions"][partition]['boot'] = not block_device_struct["partitions"][partition].get('boot', False)
-
- elif task == set_filesystem_partition:
- title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout)
- partition = select_partition(title, block_device_struct["partitions"])
-
- if partition is not None:
- if not block_device_struct["partitions"][partition].get('filesystem', None):
- block_device_struct["partitions"][partition]['filesystem'] = {}
-
- fstype_title = _('Enter a desired filesystem type for the partition: ')
- fstype = Menu(fstype_title, fs_types(), skip=False).run()
-
- block_device_struct["partitions"][partition]['filesystem']['format'] = fstype
-
- return block_device_struct
-
-
-def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
- result = {}
-
- for device in block_devices:
- layout = manage_new_and_existing_partitions(device)
-
- result[device.path] = layout
-
- return result
-
-
-def select_archinstall_language(default='English'):
- languages = Translation.get_all_names()
- language = Menu(_('Select Archinstall language'), languages, default_option=default).run()
- return language
-
-
-def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]:
- wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout'))
- custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)'))
- modes = [wipe_mode, custome_mode]
-
- print(modes)
- mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run()
-
- if mode == wipe_mode:
- return get_default_partition_layout(block_devices, advanced_options)
- else:
- return select_individual_blockdevice_usage(block_devices)
-
-
-def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice:
- """
- Asks the user to select a harddrive from the `dict_o_disks` selection.
- Usually this is combined with :ref:`archinstall.list_drives`.
-
- :param dict_o_disks: A `dict` where keys are the drive-name, value should be a dict containing drive information.
- :type dict_o_disks: dict
-
- :return: The name/path (the dictionary key) of the selected drive
- :rtype: str
- """
- drives = sorted(list(dict_o_disks.keys()))
- if len(drives) >= 1:
- for index, drive in enumerate(drives):
- print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
-
- log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
-
- drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run()
- if not drive:
- return drive
-
- drive = dict_o_disks[drive]
- return drive
-
- raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
-
-
-def select_profile() -> Optional[Profile]:
- """
- # Asks the user to select a profile from the available profiles.
- #
- # :return: The name/dictionary key of the selected profile
- # :rtype: str
- # """
- top_level_profiles = sorted(list(list_profiles(filter_top_level_profiles=True)))
- options = {}
-
- for profile in top_level_profiles:
- profile = Profile(None, profile)
- description = profile.get_profile_description()
-
- option = f'{profile.profile}: {description}'
- options[option] = profile
-
- title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments')
-
- selection = Menu(title=title, p_options=list(options.keys())).run()
-
- if selection is not None:
- return options[selection]
-
- return None
-
-
-def select_language(default_value :str, preset_value :str = None) -> str:
- """
- Asks the user to select a language
- Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
-
- :return: The language/dictionary key of the selected language
- :rtype: str
- """
- kb_lang = list_keyboard_languages()
- # sort alphabetically and then by length
- # it's fine if the list is big because the Menu
- # allows for searching anyways
- sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len)
-
- selected_lang = Menu(_('Select Keyboard layout'), sorted_kb_lang, default_option=default_value, preset_values=preset_value, sort=False).run()
- return selected_lang
-
-
-def select_mirror_regions(preset_values :Dict[str, Any] = {}) -> Dict[str, Any]:
- """
- Asks the user to select a mirror or region
- Usually this is combined with :ref:`archinstall.list_mirrors`.
-
- :return: The dictionary information about a mirror/region.
- :rtype: dict
- """
- if preset_values is None:
- preselected = None
- else:
- preselected = list(preset_values.keys())
- mirrors = list_mirrors()
- selected_mirror = Menu(
- _('Select one of the regions to download packages from'),
- list(mirrors.keys()),
- preset_values=preselected,
- multi=True
- ).run()
-
- if selected_mirror is not None:
- return {selected: mirrors[selected] for selected in selected_mirror}
-
- return {}
-
-
-def select_harddrives(preset : List[str] = []) -> List[str]:
- """
- Asks the user to select one or multiple hard drives
-
- :return: List of selected hard drives
- :rtype: list
- """
- hard_drives = all_blockdevices(partitions=False).values()
- options = {f'{option}': option for option in hard_drives}
-
- if preset:
- preset_disks = {f'{option}':option for option in preset}
- else:
- preset_disks = {}
-
- selected_harddrive = Menu(
- _('Select one or more hard drives to use and configure'),
- list(options.keys()),
- preset_values=list(preset_disks.keys()),
- multi=True
- ).run()
-
- if selected_harddrive and len(selected_harddrive) > 0:
- return [options[i] for i in selected_harddrive]
-
- return []
-
-
-def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask :bool = False) -> str:
- """
- Some what convoluted function, whose job is simple.
- Select a graphics driver from a pre-defined set of popular options.
-
- (The template xorg is for beginner users, not advanced, and should
- there for appeal to the general public first and edge cases later)
- """
-
- drivers = sorted(list(options))
-
- if drivers:
- arguments = storage.get('arguments', {})
- title = DeferredTranslation('')
-
- if has_amd_graphics():
- title += _('For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.') + '\n'
- if has_intel_graphics():
- title += _('For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n')
- if has_nvidia_graphics():
- title += _('For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n')
-
- if not arguments.get('gfx_driver', None) or force_ask:
- title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers')
- arguments['gfx_driver'] = Menu(title, drivers).run()
-
- if arguments.get('gfx_driver', None) is None:
- arguments['gfx_driver'] = _("All open-source (default)")
-
- return options.get(arguments.get('gfx_driver'))
-
- raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
-
-
-def select_kernel(preset :List[str] = None) -> List[str]:
- """
- Asks the user to select a kernel for system.
-
- :return: The string as a selected kernel
- :rtype: string
- """
-
- kernels = ["linux", "linux-lts", "linux-zen", "linux-hardened"]
- default_kernel = "linux"
-
- selected_kernels = Menu(
- _('Choose which kernels to use or leave blank for default "{}"').format(default_kernel),
- kernels,
- sort=True,
- multi=True,
- preset_values=preset,
- default_option=default_kernel
- ).run()
- return selected_kernels
-
-def select_additional_repositories(preset :List[str]) -> List[str]:
- """
- Allows the user to select additional repositories (multilib, and testing) if desired.
-
- :return: The string as a selected repository
- :rtype: string
- """
-
- repositories = ["multilib", "testing"]
-
- additional_repositories = Menu(
- _('Choose which optional additional repositories to enable'),
- repositories,
- sort=False,
- multi=True,
- preset_values=preset,
- default_option=[]
- ).run()
-
- if additional_repositories is not None:
- return additional_repositories
-
- return []
-
-def select_locale_lang(default :str,preset :str = None) -> str :
- locales = list_locales()
- locale_lang = set([locale.split()[0] for locale in locales])
-
- selected_locale = Menu(
- _('Choose which locale language to use'),
- locale_lang,
- sort=True,
- preset_values=preset,
- default_option=default
- ).run()
-
- return selected_locale
-
-
-def select_locale_enc(default :str,preset :str = None) -> str:
- locales = list_locales()
- locale_enc = set([locale.split()[1] for locale in locales])
-
- selected_locale = Menu(
- _('Choose which locale encoding to use'),
- locale_enc,
- sort=True,
- preset_values=preset,
- default_option=default
- ).run()
-
- return selected_locale
-
-
-def generic_select(p_options :Union[list,dict],
- input_text :str = '',
- allow_empty_input :bool = True,
- options_output :bool = True, # function not available
- sort :bool = False,
- multi :bool = False,
- default :Any = None) -> Any:
- """
- A generic select function that does not output anything
- other than the options and their indexes. As an example:
-
- generic_select(["first", "second", "third option"])
- > first
- second
- third option
- When the user has entered the option correctly,
- this function returns an item from list, a string, or None
-
- Options can be any iterable.
- Duplicate entries are not checked, but the results with them are unreliable. Which element to choose from the duplicates depends on the return of the index()
- Default value if not on the list of options will be added as the first element
- sort will be handled by Menu()
- """
- # We check that the options are iterable. If not we abort. Else we copy them to lists
- # it options is a dictionary we use the values as entries of the list
- # if options is a string object, each character becomes an entry
- # if options is a list, we implictily build a copy to mantain immutability
- if not isinstance(p_options,Iterable):
- log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select",fg="red")
- log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING)
- raise RequirementError("generic_select() requires an iterable as option.")
-
- input_text = input_text if input_text else _('Select one of the values shown below: ')
-
- if isinstance(p_options,dict):
- options = list(p_options.values())
- else:
- options = list(p_options)
- # check that the default value is in the list. If not it will become the first entry
- if default and default not in options:
- options.insert(0,default)
-
- # one of the drawbacks of the new interface is that in only allows string like options, so we do a conversion
- # also for the default value if it exists
- soptions = list(map(str,options))
- default_value = options[options.index(default)] if default else None
-
- selected_option = Menu(
- input_text,
- soptions,
- skip=allow_empty_input,
- multi=multi,
- default_option=default_value,
- sort=sort
- ).run()
- # we return the original objects, not the strings.
- # options is the list with the original objects and soptions the list with the string values
- # thru the map, we get from the value selected in soptions it index, and thu it the original object
- if not selected_option:
- return selected_option
- elif isinstance(selected_option,list): # for multi True
- selected_option = list(map(lambda x: options[soptions.index(x)],selected_option))
- else: # for multi False
- selected_option = options[soptions.index(selected_option)]
- return selected_option
-
-
-def generic_multi_select(p_options :Union[list,dict],
- text :str = '',
- sort :bool = False,
- default :Any = None,
- allow_empty :bool = False) -> Any:
-
- text = text if text else _("Select one or more of the options below: ")
-
- return generic_select(p_options,
- input_text=text,
- allow_empty_input=allow_empty,
- sort=sort,
- multi=True,
- default=default)
-
-
-class UserList(ListManager):
- """
- subclass of ListManager for the managing of user accounts
- """
- def __init__(self,prompt :str, lusers :dict, sudo :bool = None):
- """
- param: prompt
- type: str
- param: lusers dict with the users already defined for the system
- type: Dict
- param: sudo. boolean to determine if we handle superusers or users. If None handles both types
- """
- self.sudo = sudo
- self.actions = [
- str(_('Add an user')),
- str(_('Change password')),
- str(_('Promote/Demote user')),
- str(_('Delete User'))
- ]
- self.default_action = self.actions[0]
- super().__init__(prompt,lusers,self.actions,self.default_action)
-
- def reformat(self):
- def format_element(elem):
- # secret gives away the length of the password
- if self.data[elem].get('!password'):
- pwd = '*' * 16
- # pwd = archinstall.secret(self.data[elem]['!password'])
- else:
- pwd = ''
- if self.data[elem].get('sudoer'):
- super = 'Superuser'
- else:
- super = ' '
- return f"{elem:16}: password {pwd:16} {super}"
- return list(map(lambda x:format_element(x),self.data))
-
- def action_list(self):
- if self.target:
- active_user = list(self.target.keys())[0]
- else:
- active_user = None
- sudoer = self.target[active_user].get('sudoer',False)
- if self.sudo is None:
- return self.actions
- if self.sudo and sudoer:
- return self.actions
- elif self.sudo and not sudoer:
- return [self.actions[2]]
- elif not self.sudo and sudoer:
- return [self.actions[2]]
- else:
- return self.actions
-
- def exec_action(self):
- if self.target:
- active_user = list(self.target.keys())[0]
- else:
- active_user = None
-
- if self.action == self.actions[0]: # add
- new_user = self.add_user()
- # no unicity check, if exists will be replaced
- self.data.update(new_user)
- elif self.action == self.actions[1]: # change password
- self.data[active_user]['!password'] = get_password(prompt=str(_('Password for user "{}": ').format(active_user)))
- elif self.action == self.actions[2]: # promote/demote
- self.data[active_user]['sudoer'] = not self.data[active_user]['sudoer']
- elif self.action == self.actions[3]: # delete
- del self.data[active_user]
-
- def add_user(self):
- print(_('\nDefine a new user\n'))
- prompt = str(_("User Name : "))
- while True:
- userid = input(prompt).strip(' ')
- if not userid:
- return {} # end
- if not check_for_correct_username(userid):
- pass
- else:
- break
- if self.sudo:
- sudoer = True
- elif self.sudo is not None and not self.sudo:
- sudoer = False
- else:
- sudoer = False
- sudo_choice = Menu(
- str(_('Should {} be a superuser (sudoer)?')).format(userid),
- ['yes', 'no'],
- skip=False,
- preset_values='yes' if sudoer else 'no',
- default_option='no'
- ).run()
- sudoer = True if sudo_choice == 'yes' else False
-
- password = get_password(prompt=str(_('Password for user "{}": ').format(userid)))
-
- return {userid :{"!password":password, "sudoer":sudoer}}
-
-def manage_users(prompt :str, sudo :bool) -> tuple[dict, dict]:
-
- # TODO Filtering and some kind of simpler code
- lusers = {}
- if storage['arguments'].get('!superusers',{}):
- lusers.update({uid: {'!password':storage['arguments']['!superusers'][uid].get('!password'), 'sudoer':True} for uid in storage['arguments'].get('!superusers',{})})
- if storage['arguments'].get('!users',{}):
- lusers.update({uid: {'!password':storage['arguments']['!users'][uid].get('!password'), 'sudoer':False} for uid in storage['arguments'].get('!users',{})})
- # processing
- lusers = UserList(prompt,lusers,sudo).run()
- # return data
- superusers = {uid: {'!password':lusers[uid].get('!password')} for uid in lusers if lusers[uid].get('sudoer',False)}
- users = {uid: {'!password':lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer',False)}
- storage['arguments']['!superusers'] = superusers
- storage['arguments']['!users'] = users
- return superusers,users
-
-def save_config(config: Dict):
- def preview(selection: str):
- if options['user_config'] == selection:
- json_config = config_output.user_config_to_json()
- return f'{config_output.user_configuration_file}\n{json_config}'
- elif options['user_creds'] == selection:
- if json_config := config_output.user_credentials_to_json():
- return f'{config_output.user_credentials_file}\n{json_config}'
- else:
- return str(_('No configuration'))
- elif options['disk_layout'] == selection:
- if json_config := config_output.disk_layout_to_json():
- return f'{config_output.disk_layout_file}\n{json_config}'
- else:
- return str(_('No configuration'))
- elif options['all'] == selection:
- output = f'{config_output.user_configuration_file}\n'
- if json_config := config_output.user_credentials_to_json():
- output += f'{config_output.user_credentials_file}\n'
- if json_config := config_output.disk_layout_to_json():
- output += f'{config_output.disk_layout_file}\n'
- return output[:-1]
- return None
-
- config_output = ConfigurationOutput(config)
-
- options = {
- 'user_config': str(_('Save user configuration')),
- 'user_creds': str(_('Save user credentials')),
- 'disk_layout': str(_('Save disk layout')),
- 'all': str(_('Save all'))
- }
-
- selection = Menu(
- _('Choose which configuration to save'),
- list(options.values()),
- sort=False,
- skip=True,
- preview_size=0.75,
- preview_command=preview
- ).run()
-
- if not selection:
- return
-
- while True:
- path = input(_('Enter a directory for the configuration(s) to be saved: ')).strip(' ')
- dest_path = Path(path)
- if dest_path.exists() and dest_path.is_dir():
- break
- log(_('Not a valid directory: {}').format(dest_path), fg='red')
-
- if options['user_config'] == selection:
- config_output.save_user_config(dest_path)
- elif options['user_creds'] == selection:
- config_output.save_user_creds(dest_path)
- elif options['disk_layout'] == selection:
- config_output.save_disk_layout(dest_path)
- elif options['all'] == selection:
- config_output.save_user_config(dest_path)
- config_output.save_user_creds(dest_path)
- config_output.save_disk_layout