Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/user_interaction
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/user_interaction')
-rw-r--r--archinstall/lib/user_interaction/__init__.py2
-rw-r--r--archinstall/lib/user_interaction/backwards_compatible_conf.py2
-rw-r--r--archinstall/lib/user_interaction/disk_conf.py4
-rw-r--r--archinstall/lib/user_interaction/general_conf.py121
-rw-r--r--archinstall/lib/user_interaction/manage_users_conf.py60
-rw-r--r--archinstall/lib/user_interaction/network_conf.py98
-rw-r--r--archinstall/lib/user_interaction/partitioning_conf.py74
-rw-r--r--archinstall/lib/user_interaction/subvolume_config.py202
-rw-r--r--archinstall/lib/user_interaction/system_conf.py8
-rw-r--r--archinstall/lib/user_interaction/utils.py32
10 files changed, 305 insertions, 298 deletions
diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py
index 8aba4b4d..a1ca2652 100644
--- a/archinstall/lib/user_interaction/__init__.py
+++ b/archinstall/lib/user_interaction/__init__.py
@@ -7,6 +7,6 @@ from .network_conf import ask_to_configure_network
from .partitioning_conf import select_partition, select_encrypted_partitions
from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions,
select_profile, select_archinstall_language, ask_additional_packages_to_install,
- select_additional_repositories, ask_hostname)
+ select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads)
from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk
from .utils import get_password, do_countdown
diff --git a/archinstall/lib/user_interaction/backwards_compatible_conf.py b/archinstall/lib/user_interaction/backwards_compatible_conf.py
index d91690eb..296572d2 100644
--- a/archinstall/lib/user_interaction/backwards_compatible_conf.py
+++ b/archinstall/lib/user_interaction/backwards_compatible_conf.py
@@ -40,7 +40,7 @@ def generic_select(
# We check that the options are iterable. If not we abort. Else we copy them to lists
# it options is a dictionary we use the values as entries of the list
# if options is a string object, each character becomes an entry
- # if options is a list, we implictily build a copy to mantain immutability
+ # if options is a list, we implictily build a copy to maintain immutability
if not isinstance(p_options, Iterable):
log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select", fg="red")
log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>", level=logging.WARNING)
diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py
index 371d052f..b5ed6967 100644
--- a/archinstall/lib/user_interaction/disk_conf.py
+++ b/archinstall/lib/user_interaction/disk_conf.py
@@ -45,8 +45,8 @@ def select_disk_layout(preset: Optional[Dict[str, Any]], block_devices: list, ad
choice = Menu(
_('Select what you wish to do with the selected block devices'),
modes,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py
index d4dc60db..6365014d 100644
--- a/archinstall/lib/user_interaction/general_conf.py
+++ b/archinstall/lib/user_interaction/general_conf.py
@@ -1,10 +1,9 @@
from __future__ import annotations
import logging
+import pathlib
from typing import List, Any, Optional, Dict, TYPE_CHECKING
-import archinstall
-
from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
@@ -14,9 +13,11 @@ from ..output import log
from ..profiles import Profile, list_profiles
from ..mirrors import list_mirrors
-from ..translation import Translation
+from ..translationhandler import Language, TranslationHandler
from ..packages.packages import validate_package_list
+from ..storage import storage
+
if TYPE_CHECKING:
_: Any
@@ -109,7 +110,7 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
list(mirrors.keys()),
preset_values=preselected,
multi=True,
- explode_on_interrupt=True
+ raise_error_on_interrupt=True
).run()
match selected_mirror.type_:
@@ -118,10 +119,40 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]:
case _: return {selected: mirrors[selected] for selected in selected_mirror.value}
-def select_archinstall_language(default='English'):
- languages = Translation.get_available_lang()
- language = Menu(_('Archinstall language'), languages, default_option=default).run()
- return language
+def select_archinstall_language(languages: List[Language], preset_value: Language) -> Language:
+ # these are the displayed language names which can either be
+ # the english name of a language or, if present, the
+ # name of the language in its own language
+ options = {lang.display_name: lang for lang in languages}
+
+ def dependency_preview(current_selection: str) -> Optional[str]:
+ current_lang = options[current_selection]
+
+ if current_lang.external_dep and not TranslationHandler.is_custom_font_enabled():
+ font_file = TranslationHandler.custom_font_path()
+ text = str(_('To be able to use this translation, please install a font manually that supports the language.')) + '\n'
+ text += str(_('The font should be stored as {}')).format(font_file)
+ return text
+ return None
+
+ choice = Menu(
+ _('Archinstall language'),
+ list(options.keys()),
+ default_option=preset_value.display_name,
+ preview_command=lambda x: dependency_preview(x),
+ preview_size=0.5
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Esc:
+ return preset_value
+ case MenuSelectionType.Selection:
+ language: Language = options[choice.value]
+ # we have to make sure that the proper AUR dependency is
+ # present to be able to use this language
+ if not language.external_dep or TranslationHandler.is_custom_font_enabled():
+ return language
+ return select_archinstall_language(languages, preset_value)
def select_profile(preset) -> Optional[Profile]:
@@ -147,19 +178,19 @@ def select_profile(preset) -> Optional[Profile]:
selection = Menu(
title=title,
p_options=list(options.keys()),
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match selection.type_:
case MenuSelectionType.Selection:
return options[selection.value] if selection.value is not None else None
case MenuSelectionType.Ctrl_c:
- archinstall.storage['profile_minimal'] = False
- archinstall.storage['_selected_servers'] = []
- archinstall.storage['_desktop_profile'] = None
- archinstall.arguments['desktop-environment'] = None
- archinstall.arguments['gfx_driver_packages'] = None
+ storage['profile_minimal'] = False
+ storage['_selected_servers'] = []
+ storage['_desktop_profile'] = None
+ storage['arguments']['desktop-environment'] = None
+ storage['arguments']['gfx_driver_packages'] = None
return None
case MenuSelectionType.Esc:
return None
@@ -172,27 +203,61 @@ def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List
def read_packages(already_defined: list = []) -> list:
display = ' '.join(already_defined)
- input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run()
- return input_packages.split(' ') if input_packages else []
+ input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run().strip()
+ return input_packages.split() if input_packages else []
pre_set_packages = pre_set_packages if pre_set_packages else []
packages = read_packages(pre_set_packages)
- while True:
- if len(packages):
- # Verify packages that were given
- print(_("Verifying that additional packages exist (this might take a few seconds)"))
- valid, invalid = validate_package_list(packages)
+ if not storage['arguments']['offline'] and not storage['arguments']['no_pkg_lookups']:
+ while True:
+ if len(packages):
+ # Verify packages that were given
+ print(_("Verifying that additional packages exist (this might take a few seconds)"))
+ valid, invalid = validate_package_list(packages)
- if invalid:
- log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
- packages = read_packages(valid)
- continue
- break
+ if invalid:
+ log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red')
+ packages = read_packages(valid)
+ continue
+ break
return packages
+def add_number_of_parrallel_downloads(input_number :Optional[int] = None) -> Optional[int]:
+ max_downloads = 5
+ print(_(f"This option enables the number of parallel downloads that can occur during installation"))
+ print(_(f"Enter the number of parallel downloads to be enabled.\n (Enter a value between 1 to {max_downloads})\nNote:"))
+ print(_(f" - Maximum value : {max_downloads} ( Allows {max_downloads} parallel downloads, allows {max_downloads+1} downloads at a time )"))
+ print(_(f" - Minimum value : 1 ( Allows 1 parallel download, allows 2 downloads at a time )"))
+ print(_(f" - Disable/Default : 0 ( Disables parallel downloading, allows only 1 download at a time )"))
+
+ while True:
+ try:
+ input_number = int(TextInput(_("[Default value: 0] > ")).run().strip() or 0)
+ if input_number <= 0:
+ input_number = 0
+ elif input_number > max_downloads:
+ input_number = max_downloads
+ break
+ except:
+ print(_(f"Invalid input! Try again with a valid input [1 to {max_downloads}, or 0 to disable]"))
+
+ pacman_conf_path = pathlib.Path("/etc/pacman.conf")
+ with pacman_conf_path.open() as f:
+ pacman_conf = f.read().split("\n")
+
+ with pacman_conf_path.open("w") as fwrite:
+ for line in pacman_conf:
+ if "ParallelDownloads" in line:
+ fwrite.write(f"ParallelDownloads = {input_number+1}\n") if not input_number == 0 else fwrite.write("#ParallelDownloads = 0\n")
+ else:
+ fwrite.write(f"{line}\n")
+
+ return input_number
+
+
def select_additional_repositories(preset: List[str]) -> List[str]:
"""
Allows the user to select additional repositories (multilib, and testing) if desired.
@@ -209,7 +274,7 @@ def select_additional_repositories(preset: List[str]) -> List[str]:
sort=False,
multi=True,
preset_values=preset,
- explode_on_interrupt=True
+ raise_error_on_interrupt=True
).run()
match choice.type_:
diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py
index 567a2964..84ce3556 100644
--- a/archinstall/lib/user_interaction/manage_users_conf.py
+++ b/archinstall/lib/user_interaction/manage_users_conf.py
@@ -7,6 +7,7 @@ from .utils import get_password
from ..menu import Menu
from ..menu.list_manager import ListManager
from ..models.users import User
+from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -18,56 +19,51 @@ class UserList(ListManager):
"""
def __init__(self, prompt: str, lusers: List[User]):
- """
- param: prompt
- type: str
- param: lusers dict with the users already defined for the system
- type: Dict
- param: sudo. boolean to determine if we handle superusers or users. If None handles both types
- """
self._actions = [
str(_('Add a user')),
str(_('Change password')),
str(_('Promote/Demote user')),
str(_('Delete User'))
]
- super().__init__(prompt, lusers, self._actions, self._actions[0])
+ super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:])
def reformat(self, data: List[User]) -> Dict[str, User]:
- return {e.display(): e for e in data}
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
- def action_list(self):
- active_user = self.target if self.target else None
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data = {f' {rows[0]}': None, f' {rows[1]}': None}
- if active_user is None:
- return [self._actions[0]]
- else:
- return self._actions[1:]
+ for row, user in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = user
- def exec_action(self, data: List[User]) -> List[User]:
- if self.target:
- active_user = self.target
- else:
- active_user = None
+ return display_data
- if self.action == self._actions[0]: # add
+ def selected_action_display(self, user: User) -> str:
+ return user.username
+
+ def handle_action(self, action: str, entry: Optional[User], data: List[User]) -> List[User]:
+ if action == self._actions[0]: # add
new_user = self._add_user()
if new_user is not None:
# in case a user with the same username as an existing user
# was created we'll replace the existing one
data = [d for d in data if d.username != new_user.username]
data += [new_user]
- elif self.action == self._actions[1]: # change password
- prompt = str(_('Password for user "{}": ').format(active_user.username))
+ elif action == self._actions[1]: # change password
+ prompt = str(_('Password for user "{}": ').format(entry.username))
new_password = get_password(prompt=prompt)
if new_password:
- user = next(filter(lambda x: x == active_user, data), 1)
+ user = next(filter(lambda x: x == entry, data))
user.password = new_password
- elif self.action == self._actions[2]: # promote/demote
- user = next(filter(lambda x: x == active_user, data), 1)
+ elif action == self._actions[2]: # promote/demote
+ user = next(filter(lambda x: x == entry, data))
user.sudo = False if user.sudo else True
- elif self.action == self._actions[3]: # delete
- data = [d for d in data if d != active_user]
+ elif action == self._actions[3]: # delete
+ data = [d for d in data if d != entry]
return data
@@ -77,8 +73,7 @@ class UserList(ListManager):
return False
def _add_user(self) -> Optional[User]:
- print(_('\nDefine a new user\n'))
- prompt = str(_('Enter username (leave blank to skip): '))
+ prompt = '\n\n' + str(_('Enter username (leave blank to skip): '))
while True:
username = input(prompt).strip(' ')
@@ -94,7 +89,9 @@ class UserList(ListManager):
choice = Menu(
str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(),
skip=False,
- default_option=Menu.no()
+ default_option=Menu.no(),
+ clear_screen=False,
+ show_search_hint=False
).run()
sudo = True if choice.value == Menu.yes() else False
@@ -102,6 +99,5 @@ class UserList(ListManager):
def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]:
- prompt = prompt if prompt else _('Enter username (leave blank to skip): ')
users = UserList(prompt, defined_users).run()
return users
diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py
index 5154d8b1..557e8ed8 100644
--- a/archinstall/lib/user_interaction/network_conf.py
+++ b/archinstall/lib/user_interaction/network_conf.py
@@ -2,7 +2,7 @@ from __future__ import annotations
import ipaddress
import logging
-from typing import Any, Optional, TYPE_CHECKING, List, Union
+from typing import Any, Optional, TYPE_CHECKING, List, Union, Dict
from ..menu.menu import MenuSelectionType
from ..menu.text_input import TextInput
@@ -10,7 +10,7 @@ from ..models.network_configuration import NetworkConfiguration, NicType
from ..networking import list_interfaces
from ..menu import Menu
-from ..output import log
+from ..output import log, FormattedOutput
from ..menu.list_manager import ListManager
if TYPE_CHECKING:
@@ -19,55 +19,55 @@ if TYPE_CHECKING:
class ManualNetworkConfig(ListManager):
"""
- subclass of ListManager for the managing of network configuration accounts
+ subclass of ListManager for the managing of network configurations
"""
- def __init__(self, prompt: str, ifaces: Union[None, NetworkConfiguration, List[NetworkConfiguration]]):
- """
- param: prompt
- type: str
- param: ifaces already defined previously
- type: Dict
- """
+ def __init__(self, prompt: str, ifaces: List[NetworkConfiguration]):
+ self._actions = [
+ str(_('Add interface')),
+ str(_('Edit interface')),
+ str(_('Delete interface'))
+ ]
- if ifaces is not None and isinstance(ifaces, list):
- display_values = {iface.iface: iface for iface in ifaces}
- else:
- display_values = {}
+ super().__init__(prompt, ifaces, [self._actions[0]], self._actions[1:])
+
+ def reformat(self, data: List[NetworkConfiguration]) -> Dict[str, Optional[NetworkConfiguration]]:
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
- self._action_add = str(_('Add interface'))
- self._action_edit = str(_('Edit interface'))
- self._action_delete = str(_('Delete interface'))
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data: Dict[str, Optional[NetworkConfiguration]] = {f' {rows[0]}': None, f' {rows[1]}': None}
- self._iface_actions = [self._action_edit, self._action_delete]
+ for row, iface in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = iface
- super().__init__(prompt, display_values, self._iface_actions, self._action_add)
+ return display_data
- def run_manual(self) -> List[NetworkConfiguration]:
- ifaces = super().run()
- if ifaces is not None:
- return list(ifaces.values())
- return []
+ def selected_action_display(self, iface: NetworkConfiguration) -> str:
+ return iface.iface if iface.iface else ''
- def exec_action(self, data: Any):
- if self.action == self._action_add:
- iface_name = self._select_iface(data.keys())
+ def handle_action(self, action: str, entry: Optional[NetworkConfiguration], data: List[NetworkConfiguration]):
+ if action == self._actions[0]: # add
+ iface_name = self._select_iface(data)
if iface_name:
iface = NetworkConfiguration(NicType.MANUAL, iface=iface_name)
- data[iface_name] = self._edit_iface(iface)
- elif self.target:
- iface_name = list(self.target.keys())[0]
- iface = data[iface_name]
-
- if self.action == self._action_edit:
- data[iface_name] = self._edit_iface(iface)
- elif self.action == self._action_delete:
- del data[iface_name]
+ iface = self._edit_iface(iface)
+ data += [iface]
+ elif entry:
+ if action == self._actions[1]: # edit interface
+ data = [d for d in data if d.iface != entry.iface]
+ data.append(self._edit_iface(entry))
+ elif action == self._actions[2]: # delete
+ data = [d for d in data if d != entry]
return data
- def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]:
+ def _select_iface(self, data: List[NetworkConfiguration]) -> Optional[Any]:
all_ifaces = list_interfaces().values()
+ existing_ifaces = [d.iface for d in data]
available = set(all_ifaces) - set(existing_ifaces)
choice = Menu(str(_('Select interface to add')), list(available), skip=True).run()
@@ -76,7 +76,7 @@ class ManualNetworkConfig(ListManager):
return choice.value
- def _edit_iface(self, edit_iface :NetworkConfiguration):
+ def _edit_iface(self, edit_iface: NetworkConfiguration):
iface_name = edit_iface.iface
modes = ['DHCP (auto detect)', 'IP (static)']
default_mode = 'DHCP (auto detect)'
@@ -99,11 +99,13 @@ class ManualNetworkConfig(ListManager):
gateway = None
while 1:
- gateway_input = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '),
- edit_iface.gateway).run().strip()
+ gateway = TextInput(
+ _('Enter your gateway (router) IP address or leave blank for none: '),
+ edit_iface.gateway
+ ).run().strip()
try:
- if len(gateway_input) > 0:
- ipaddress.ip_address(gateway_input)
+ if len(gateway) > 0:
+ ipaddress.ip_address(gateway)
break
except ValueError:
log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red')
@@ -124,7 +126,9 @@ class ManualNetworkConfig(ListManager):
return NetworkConfiguration(NicType.MANUAL, iface=iface_name)
-def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[NetworkConfiguration]]) -> Optional[Union[List[NetworkConfiguration], NetworkConfiguration]]:
+def ask_to_configure_network(
+ preset: Union[NetworkConfiguration, List[NetworkConfiguration]]
+) -> Optional[NetworkConfiguration | List[NetworkConfiguration]]:
"""
Configure the network on the newly installed system
"""
@@ -150,8 +154,8 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw
list(network_options.values()),
cursor_index=cursor_idx,
sort=False,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
@@ -165,7 +169,7 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw
elif choice.value == network_options['network_manager']:
return NetworkConfiguration(NicType.NM)
elif choice.value == network_options['manual']:
- manual = ManualNetworkConfig('Configure interfaces', preset)
- return manual.run_manual()
+ preset_ifaces = preset if isinstance(preset, list) else []
+ return ManualNetworkConfig('Configure interfaces', preset_ifaces).run()
return preset
diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py
index bfff5705..f2e6b881 100644
--- a/archinstall/lib/user_interaction/partitioning_conf.py
+++ b/archinstall/lib/user_interaction/partitioning_conf.py
@@ -5,7 +5,7 @@ from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional
from ..menu import Menu
from ..menu.menu import MenuSelectionType
-from ..output import log
+from ..output import log, FormattedOutput
from ..disk.validators import fs_types
@@ -28,16 +28,31 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
pad_right = spaces - pad_left
return f'{pad_right * " "}{name}{pad_left * " "}|'
+ def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]:
+ flattened = {}
+ for k, v in data.items():
+ if k == 'filesystem':
+ flat = flatten_data(v)
+ flattened.update(flat)
+ elif k == 'btrfs':
+ # we're going to create a separate table for the btrfs subvolumes
+ pass
+ else:
+ flattened[k] = v
+ return flattened
+
+ display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions]
+
column_names = {}
# this will add an initial index to the table for each partition
if with_idx:
- column_names['index'] = max([len(str(len(partitions))), len('index')])
+ column_names['index'] = max([len(str(len(display_data))), len('index')])
# determine all attribute names and the max length
- # of the value among all partitions to know the width
+ # of the value among all display_data to know the width
# of the table cells
- for p in partitions:
+ for p in display_data:
for attribute, value in p.items():
if attribute in column_names.keys():
column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
@@ -50,7 +65,7 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
- for idx, p in enumerate(partitions):
+ for idx, p in enumerate(display_data):
row = ''
for name, max_len in column_names.items():
if name == 'index':
@@ -62,6 +77,13 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool =
current_layout += f'{row[:-1]}\n'
+ # we'll create a separate table for the btrfs subvolumes
+ btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)]
+ if len(btrfs_subvolumes) > 0:
+ for subvolumes in btrfs_subvolumes:
+ output = FormattedOutput.as_table(subvolumes)
+ current_layout += f'\n{output}'
+
if with_title:
title = str(_('Current partition layout'))
return f'\n\n{title}:\n\n{current_layout}'
@@ -118,23 +140,10 @@ def get_default_partition_layout(
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
-def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
- result = {}
-
- for device in block_devices:
- layout = manage_new_and_existing_partitions(device)
- result[device.path] = layout
-
- return result
-
-
def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
original_layout = copy.deepcopy(block_device_struct)
- # Test code: [part.__dump__() for part in block_device.partitions.values()]
- # TODO: Squeeze in BTRFS subvolumes here
-
new_partition = str(_('Create a new partition'))
suggest_partition_layout = str(_('Suggest partition layout'))
delete_partition = str(_('Delete a partition'))
@@ -187,6 +196,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
return original_layout
elif task == save_and_exit:
break
+
if task == new_partition:
from ..disk import valid_parted_position
@@ -200,8 +210,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if fs_choice.type_ == MenuSelectionType.Esc:
continue
- prompt = _('Enter the start sector (percentage or block number, default: {}): ').format(
- block_device.first_free_sector)
+ prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
+ block_device.first_free_sector
+ )
start = input(prompt).strip()
if not start.strip():
@@ -210,8 +221,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
else:
end_suggested = '100%'
- prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format(
- end_suggested)
+ prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
+ end_suggested
+ )
end = input(prompt).strip()
if not end.strip():
@@ -224,7 +236,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
continue
block_device_struct["partitions"].append({
- "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject
+ "type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject
"start": start,
"size": end,
"mountpoint": None,
@@ -351,18 +363,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str,
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
- if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}):
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {}
+ if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
- result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run()
- if result:
- block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
- else:
- del block_device_struct["partitions"][partition]['btrfs']
+ result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
+ block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
return block_device_struct
+
def select_encrypted_partitions(
title :str,
partitions :List[Partition],
@@ -374,11 +384,9 @@ def select_encrypted_partitions(
if len(partition_indexes) == 0:
return None
- title = _('Select which partitions to mark for formatting:')
-
# show current partition layout:
if len(partitions):
- title += current_partition_layout(partitions) + '\n'
+ title += current_partition_layout(partitions, with_idx=True) + '\n'
choice = Menu(title, partition_indexes, multi=multiple).run()
diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py
index af783639..94150dee 100644
--- a/archinstall/lib/user_interaction/subvolume_config.py
+++ b/archinstall/lib/user_interaction/subvolume_config.py
@@ -1,146 +1,98 @@
-from typing import Dict, List
+from typing import Dict, List, Optional, Any, TYPE_CHECKING
from ..menu.list_manager import ListManager
from ..menu.menu import MenuSelectionType
-from ..menu.selection_menu import Selector, GeneralMenu
from ..menu.text_input import TextInput
from ..menu import Menu
+from ..models.subvolume import Subvolume
+from ... import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
-"""
-UI classes
-"""
class SubvolumeList(ListManager):
- def __init__(self,prompt,list):
- self.ObjectNullAction = None # str(_('Add'))
- self.ObjectDefaultAction = str(_('Add'))
- super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction)
-
- def reformat(self, data: Dict) -> Dict:
- def presentation(key :str, value :Dict):
- text = _(" Subvolume :{:16}").format(key)
- if isinstance(value,str):
- text += _(" mounted at {:16}").format(value)
- else:
- if value.get('mountpoint'):
- text += _(" mounted at {:16}").format(value['mountpoint'])
- else:
- text += (' ' * 28)
-
- if value.get('options',[]):
- text += _(" with option {}").format(', '.join(value['options']))
- return text
-
- formatted = {presentation(k, v): k for k, v in data.items()}
- return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])}
-
- def action_list(self):
- return super().action_list()
-
- def exec_action(self, data: Dict):
- if self.target:
- origkey, origval = list(self.target.items())[0]
- else:
- origkey = None
-
- if self.action == str(_('Delete')):
- del data[origkey]
- else:
- if self.action == str(_('Add')):
- self.target = {}
- print(_('\n Fill the desired values for a new subvolume \n'))
- with SubvolumeMenu(self.target,self.action) as add_menu:
- for elem in ['name','mountpoint','options']:
- add_menu.exec_option(elem)
- else:
- SubvolumeMenu(self.target,self.action).run()
-
- data.update(self.target)
+ def __init__(self, prompt: str, subvolumes: List[Subvolume]):
+ self._actions = [
+ str(_('Add subvolume')),
+ str(_('Edit subvolume')),
+ str(_('Delete subvolume'))
+ ]
+ super().__init__(prompt, subvolumes, [self._actions[0]], self._actions[1:])
- return data
+ def reformat(self, data: List[Subvolume]) -> Dict[str, Optional[Subvolume]]:
+ table = FormattedOutput.as_table(data)
+ rows = table.split('\n')
+
+ # these are the header rows of the table and do not map to any User obviously
+ # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
+ # the selectable rows so the header has to be aligned
+ display_data: Dict[str, Optional[Subvolume]] = {f' {rows[0]}': None, f' {rows[1]}': None}
+
+ for row, subvol in zip(rows[2:], data):
+ row = row.replace('|', '\\|')
+ display_data[row] = subvol
+
+ return display_data
+ def selected_action_display(self, subvolume: Subvolume) -> str:
+ return subvolume.name
+
+ def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]:
+ preset_options = []
+ if editing:
+ preset_options = editing.options
-class SubvolumeMenu(GeneralMenu):
- def __init__(self,parameters,action=None):
- self.data = parameters
- self.action = action
- self.ds = {}
- self.ds['name'] = None
- self.ds['mountpoint'] = None
- self.ds['options'] = None
- if self.data:
- origkey,origval = list(self.data.items())[0]
- self.ds['name'] = origkey
- if isinstance(origval,str):
- self.ds['mountpoint'] = origval
- else:
- self.ds['mountpoint'] = self.data[origkey].get('mountpoint')
- self.ds['options'] = self.data[origkey].get('options')
-
- super().__init__(data_store=self.ds)
-
- def _setup_selection_menu_options(self):
- # [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))]
- self._menu_options['name'] = Selector(str(_('Subvolume name ')),
- self._select_subvolume_name if not self.action or self.action in (str(_('Add')),str(_('Copy'))) else None,
- mandatory=True,
- enabled=True)
- self._menu_options['mountpoint'] = Selector(str(_('Subvolume mountpoint')),
- self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
- self._menu_options['options'] = Selector(str(_('Subvolume options')),
- self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None,
- enabled=True)
- self._menu_options['save'] = Selector(str(_('Save')),
- exec_func=lambda n,v:True,
- enabled=True)
- self._menu_options['cancel'] = Selector(str(_('Cancel')),
- # func = lambda pre:True,
- exec_func=lambda n,v:self.fast_exit(n),
- enabled=True)
- self.cancel_action = 'cancel'
- self.save_action = 'save'
- self.bottom_list = [self.save_action,self.cancel_action]
-
- def fast_exit(self,accion):
- if self.option(accion).get_selection():
- for item in self.list_options():
- if self.option(item).is_mandatory():
- self.option(item).set_mandatory(False)
- return True
-
- def exit_callback(self):
- # we exit without moving data
- if self.option(self.cancel_action).get_selection():
- return
- if not self.ds['name']:
- return
- else:
- key = self.ds['name']
- value = {}
- if self.ds['mountpoint']:
- value['mountpoint'] = self.ds['mountpoint']
- if self.ds['options']:
- value['options'] = self.ds['options']
- self.data.update({key : value})
-
- def _select_subvolume_name(self,value):
- return TextInput(str(_("Subvolume name :")),value).run()
-
- def _select_subvolume_mount_point(self,value):
- return TextInput(str(_("Select a mount point :")),value).run()
-
- def _select_subvolume_options(self,value) -> List[str]:
- # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True):
choice = Menu(
str(_("Select the desired subvolume options ")),
['nodatacow','compress'],
skip=True,
- preset_values=value,
+ preset_values=preset_options,
multi=True
).run()
if choice.type_ == MenuSelectionType.Selection:
- return choice.value
+ return choice.value # type: ignore
return []
+
+ def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]:
+ name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run()
+
+ if not name:
+ return None
+
+ mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run()
+
+ if not mountpoint:
+ return None
+
+ options = self._prompt_options(editing)
+
+ subvolume = Subvolume(name, mountpoint)
+ subvolume.compress = 'compress' in options
+ subvolume.nodatacow = 'nodatacow' in options
+
+ return subvolume
+
+ def handle_action(self, action: str, entry: Optional[Subvolume], data: List[Subvolume]) -> List[Subvolume]:
+ if action == self._actions[0]: # add
+ new_subvolume = self._add_subvolume()
+
+ if new_subvolume is not None:
+ # in case a user with the same username as an existing user
+ # was created we'll replace the existing one
+ data = [d for d in data if d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif entry is not None:
+ if action == self._actions[1]: # edit subvolume
+ new_subvolume = self._add_subvolume(entry)
+
+ if new_subvolume is not None:
+ # we'll remove the original subvolume and add the modified version
+ data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif action == self._actions[2]: # delete
+ data = [d for d in data if d != entry]
+
+ return data
diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py
index 78daa6a5..94bbac30 100644
--- a/archinstall/lib/user_interaction/system_conf.py
+++ b/archinstall/lib/user_interaction/system_conf.py
@@ -32,8 +32,8 @@ def select_kernel(preset: List[str] = None) -> List[str]:
sort=True,
multi=True,
preset_values=preset,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match choice.type_:
@@ -67,8 +67,8 @@ def select_harddrives(preset: List[str] = []) -> List[str]:
list(options.keys()),
preset_values=list(preset_disks.keys()),
multi=True,
- explode_on_interrupt=True,
- explode_warning=warning
+ raise_error_on_interrupt=True,
+ raise_error_warning_msg=warning
).run()
match selected_harddrive.type_:
diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py
index fa079bc2..7ee6fc07 100644
--- a/archinstall/lib/user_interaction/utils.py
+++ b/archinstall/lib/user_interaction/utils.py
@@ -7,6 +7,7 @@ import time
from typing import Any, Optional, TYPE_CHECKING
from ..menu import Menu
+from ..models.password_strength import PasswordStrength
from ..output import log
if TYPE_CHECKING:
@@ -16,42 +17,23 @@ if TYPE_CHECKING:
SIG_TRIGGER = None
-def check_password_strong(passwd: str) -> bool:
- symbol_count = 0
- if any(character.isdigit() for character in passwd):
- symbol_count += 10
- if any(character.isupper() for character in passwd):
- symbol_count += 26
- if any(character.islower() for character in passwd):
- symbol_count += 26
- if any(not character.isalnum() for character in passwd):
- symbol_count += 40
-
- if symbol_count**len(passwd) < 10e20:
- prompt = str(_("The password you are using seems to be weak, are you sure you want to use it?"))
- choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run()
- return choice.value == Menu.yes()
-
- return True
-
-
def get_password(prompt: str = '') -> Optional[str]:
if not prompt:
prompt = _("Enter a password: ")
- while passwd := getpass.getpass(prompt):
- if len(passwd.strip()) <= 0:
+ while password := getpass.getpass(prompt):
+ if len(password.strip()) <= 0:
break
- if not check_password_strong(passwd):
- continue
+ strength = PasswordStrength.strength(password)
+ log(f'Password strength: {strength.value}', fg=strength.color())
passwd_verification = getpass.getpass(prompt=_('And one more time for verification: '))
- if passwd != passwd_verification:
+ if password != passwd_verification:
log(' * Passwords did not match * ', fg='red')
continue
- return passwd
+ return password
return None