From 00b0ae7ba439a5a420095175b3bedd52c569db51 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Wed, 19 Apr 2023 20:55:42 +1000 Subject: PyParted and a large rewrite of the underlying partitioning (#1604) * Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum Co-authored-by: Anton Hvornum Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> --- archinstall/lib/disk/partitioning_menu.py | 335 ++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 archinstall/lib/disk/partitioning_menu.py (limited to 'archinstall/lib/disk/partitioning_menu.py') diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py new file mode 100644 index 00000000..686e8c29 --- /dev/null +++ b/archinstall/lib/disk/partitioning_menu.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple + +from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \ + ModificationStatus +from ..menu import Menu, ListManager, MenuSelection, TextInput +from ..output import FormattedOutput, log +from .subvolume_menu import SubvolumeMenu + +if TYPE_CHECKING: + _: Any + + +class PartitioningList(ListManager): + """ + subclass of ListManager for the managing of user accounts + """ + def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]): + self._device = device + self._actions = { + 'create_new_partition': str(_('Create a new partition')), + 'suggest_partition_layout': str(_('Suggest partition layout')), + 'remove_added_partitions': str(_('Remove all newly added partitions')), + 'assign_mountpoint': str(_('Assign mountpoint')), + 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')), + 'mark_bootable': str(_('Mark/Unmark as bootable')), + 'set_filesystem': str(_('Change filesystem')), + 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only + 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only + 'delete_partition': str(_('Delete partition')) + } + + display_actions = list(self._actions.values()) + super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:]) + + def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, user in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = user + + return display_data + + def selected_action_display(self, partition: PartitionModification) -> str: + return str(_('Partition')) + + def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]: + not_filter = [] + + # only display formatting if the partition exists already + if not selection.exists(): + not_filter += [self._actions['mark_formatting']] + else: + # only allow these options if the existing partition + # was marked as formatting, otherwise we run into issues where + # 1. select a new fs -> potentially mark as wipe now + # 2. Switch back to old filesystem -> should unmark wipe now, but + # how do we know it was the original one? + not_filter += [ + self._actions['set_filesystem'], + self._actions['assign_mountpoint'], + self._actions['mark_bootable'], + self._actions['btrfs_mark_compressed'], + self._actions['btrfs_set_subvolumes'] + ] + + # non btrfs partitions shouldn't get btrfs options + if selection.fs_type != FilesystemType.Btrfs: + not_filter += [self._actions['btrfs_mark_compressed'], self._actions['btrfs_set_subvolumes']] + else: + not_filter += [self._actions['assign_mountpoint']] + + return [o for o in options if o not in not_filter] + + def handle_action( + self, + action: str, + entry: Optional[PartitionModification], + data: List[PartitionModification] + ) -> List[PartitionModification]: + action_key = [k for k, v in self._actions.items() if v == action][0] + + match action_key: + case 'create_new_partition': + new_partition = self._create_new_partition() + data += [new_partition] + case 'suggest_partition_layout': + new_partitions = self._suggest_partition_layout(data) + if len(new_partitions) > 0: + data = new_partitions + case 'remove_added_partitions': + choice = self._reset_confirmation() + if choice.value == Menu.yes(): + data = [part for part in data if part.is_exists_or_modify()] + case 'assign_mountpoint' if entry: + entry.mountpoint = self._prompt_mountpoint() + if entry.mountpoint == Path('/boot'): + entry.set_flag(PartitionFlag.Boot) + case 'mark_formatting' if entry: + self._prompt_formatting(entry) + case 'mark_bootable' if entry: + entry.invert_flag(PartitionFlag.Boot) + case 'set_filesystem' if entry: + fs_type = self._prompt_partition_fs_type() + if fs_type: + entry.fs_type = fs_type + # btrfs subvolumes will define mountpoints + if fs_type == FilesystemType.Btrfs: + entry.mountpoint = None + case 'btrfs_mark_compressed' if entry: + self._set_compressed(entry) + case 'btrfs_set_subvolumes' if entry: + self._set_btrfs_subvolumes(entry) + case 'delete_partition' if entry: + data = self._delete_partition(entry, data) + + return data + + def _delete_partition( + self, + entry: PartitionModification, + data: List[PartitionModification] + ) -> List[PartitionModification]: + if entry.is_exists_or_modify(): + entry.status = ModificationStatus.Delete + return data + else: + return [d for d in data if d != entry] + + def _set_compressed(self, partition: PartitionModification): + compression = 'compress=zstd' + + if compression in partition.mount_options: + partition.mount_options = [o for o in partition.mount_options if o != compression] + else: + partition.mount_options.append(compression) + + def _set_btrfs_subvolumes(self, partition: PartitionModification): + partition.btrfs_subvols = SubvolumeMenu( + _("Manage btrfs subvolumes for current partition"), + partition.btrfs_subvols + ).run() + + def _prompt_formatting(self, partition: PartitionModification): + # an existing partition can toggle between Exist or Modify + if partition.is_modify(): + partition.status = ModificationStatus.Exist + return + elif partition.exists(): + partition.status = ModificationStatus.Modify + + # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really + # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set, + # it's safe to change the filesystem for this partition. + if partition.fs_type == FilesystemType.Crypto_luks: + prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified')) + fs_type = self._prompt_partition_fs_type(prompt) + partition.fs_type = fs_type + + if fs_type == FilesystemType.Btrfs: + partition.mountpoint = None + + def _prompt_mountpoint(self) -> Path: + header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n' + header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n' + prompt = str(_('Mountpoint: ')) + + print(header) + + while True: + value = TextInput(prompt).run().strip() + + if value: + mountpoint = Path(value) + break + + return mountpoint + + def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType: + options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks} + + prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition')) + choice = Menu(prompt, options, sort=False, skip=False).run() + return options[choice.single_value] + + def _validate_sector(self, start_sector: str, end_sector: Optional[str] = None) -> bool: + if not start_sector.isdigit(): + return False + + if end_sector: + if end_sector.endswith('%'): + if not end_sector[:-1].isdigit(): + return False + elif not end_sector.isdigit(): + return False + elif int(start_sector) > int(end_sector): + return False + + return True + + def _prompt_sectors(self) -> Tuple[Size, Size]: + device_info = self._device.device_info + + text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n' + free_space_table = FormattedOutput.as_table(device_info.free_space_regions) + prompt = text + free_space_table + '\n' + + total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size) + prompt += str(_('Total sectors: {}')).format(total_sectors) + '\n' + print(prompt) + + largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length()) + + # prompt until a valid start sector was entered + while True: + start_prompt = str(_('Enter the start sector (default: {}): ')).format(largest_free_area.start) + start_sector = TextInput(start_prompt).run().strip() + + if not start_sector or self._validate_sector(start_sector): + break + + log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO) + + if not start_sector: + start_sector = str(largest_free_area.start) + end_sector = str(largest_free_area.end) + else: + end_sector = '100%' + + # prompt until valid end sector was entered + while True: + end_prompt = str(_('Enter the end sector of the partition (percentage or block number, default: {}): ')).format(end_sector) + end_value = TextInput(end_prompt).run().strip() + + if not end_value or self._validate_sector(start_sector, end_value): + break + + log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO) + + # override the default value with the user value + if end_value: + end_sector = end_value + + start_size = Size(int(start_sector), Unit.sectors, device_info.sector_size) + + if end_sector.endswith('%'): + end_size = Size(int(end_sector[:-1]), Unit.Percent, device_info.sector_size, device_info.total_size) + else: + end_size = Size(int(end_sector), Unit.sectors, device_info.sector_size) + + return start_size, end_size + + def _create_new_partition(self) -> PartitionModification: + fs_type = self._prompt_partition_fs_type() + + start_size, end_size = self._prompt_sectors() + length = end_size - start_size + + # new line for the next prompt + print() + + mountpoint = None + if fs_type != FilesystemType.Btrfs: + mountpoint = self._prompt_mountpoint() + + partition = PartitionModification( + status=ModificationStatus.Create, + type=PartitionType.Primary, + start=start_size, + length=length, + fs_type=fs_type, + mountpoint=mountpoint + ) + + if partition.mountpoint == Path('/boot'): + partition.set_flag(PartitionFlag.Boot) + + return partition + + def _reset_confirmation(self) -> MenuSelection: + prompt = str(_('This will remove all newly added partitions, continue?')) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run() + return choice + + def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]: + # if modifications have been done already, inform the user + # that this operation will erase those modifications + if any([not entry.exists() for entry in data]): + choice = self._reset_confirmation() + if choice.value == Menu.no(): + return [] + + from ..user_interaction.disk_conf import suggest_single_disk_layout + + device_modification = suggest_single_disk_layout(self._device) + return device_modification.partitions + + +def manual_partitioning( + device: BDevice, + prompt: str = '', + preset: List[PartitionModification] = [] +) -> List[PartitionModification]: + if not prompt: + prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n' + prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB)) + + manual_preset = [] + + if not preset: + # we'll display the existing partitions of the device + for partition in device.partition_infos: + manual_preset.append( + PartitionModification.from_existing_partition(partition) + ) + else: + manual_preset = preset + + menu_list = PartitioningList(prompt, device, manual_preset) + partitions = menu_list.run() + + if menu_list.is_last_choice_cancel(): + return preset + + return partitions -- cgit v1.2.3-70-g09d2