Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/partitioning_menu.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/partitioning_menu.py')
-rw-r--r--archinstall/lib/disk/partitioning_menu.py429
1 files changed, 429 insertions, 0 deletions
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
new file mode 100644
index 00000000..fb1eb74b
--- /dev/null
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -0,0 +1,429 @@
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import Any, TYPE_CHECKING, List, Optional, Tuple
+from dataclasses import dataclass
+
+from .device_model import (
+ PartitionModification, FilesystemType, BDevice,
+ Size, Unit, PartitionType, PartitionFlag,
+ ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption
+)
+from ..hardware import SysInfo
+from ..menu import Menu, ListManager, MenuSelection, TextInput
+from ..output import FormattedOutput, warn
+from .subvolume_menu import SubvolumeMenu
+
+if TYPE_CHECKING:
+ _: Any
+
+
+@dataclass
+class DefaultFreeSector:
+ start: Size
+ end: Size
+
+
+class PartitioningList(ListManager):
+ """
+ subclass of ListManager for the managing of user accounts
+ """
+ def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]):
+ self._device = device
+ self._actions = {
+ 'create_new_partition': str(_('Create a new partition')),
+ 'suggest_partition_layout': str(_('Suggest partition layout')),
+ 'remove_added_partitions': str(_('Remove all newly added partitions')),
+ 'assign_mountpoint': str(_('Assign mountpoint')),
+ 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')),
+ 'mark_bootable': str(_('Mark/Unmark as bootable')),
+ 'set_filesystem': str(_('Change filesystem')),
+ 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only
+ 'btrfs_mark_nodatacow': str(_('Mark/Unmark as nodatacow')), # btrfs only
+ 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only
+ 'delete_partition': str(_('Delete partition'))
+ }
+
+ display_actions = list(self._actions.values())
+ super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
+
+ def selected_action_display(self, partition: PartitionModification) -> str:
+ return str(_('Partition'))
+
+ def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]:
+ not_filter = []
+
+ # only display formatting if the partition exists already
+ if not selection.exists():
+ not_filter += [self._actions['mark_formatting']]
+ else:
+ # only allow options if the existing partition
+ # was marked as formatting, otherwise we run into issues where
+ # 1. select a new fs -> potentially mark as wipe now
+ # 2. Switch back to old filesystem -> should unmark wipe now, but
+ # how do we know it was the original one?
+ not_filter += [
+ self._actions['set_filesystem'],
+ self._actions['mark_bootable'],
+ self._actions['btrfs_mark_compressed'],
+ self._actions['btrfs_mark_nodatacow'],
+ self._actions['btrfs_set_subvolumes']
+ ]
+
+ # non btrfs partitions shouldn't get btrfs options
+ if selection.fs_type != FilesystemType.Btrfs:
+ not_filter += [
+ self._actions['btrfs_mark_compressed'],
+ self._actions['btrfs_mark_nodatacow'],
+ self._actions['btrfs_set_subvolumes']
+ ]
+ else:
+ not_filter += [self._actions['assign_mountpoint']]
+
+ return [o for o in options if o not in not_filter]
+
+ def handle_action(
+ self,
+ action: str,
+ entry: Optional[PartitionModification],
+ data: List[PartitionModification]
+ ) -> List[PartitionModification]:
+ action_key = [k for k, v in self._actions.items() if v == action][0]
+
+ match action_key:
+ case 'create_new_partition':
+ new_partition = self._create_new_partition()
+ data += [new_partition]
+ case 'suggest_partition_layout':
+ new_partitions = self._suggest_partition_layout(data)
+ if len(new_partitions) > 0:
+ data = new_partitions
+ case 'remove_added_partitions':
+ choice = self._reset_confirmation()
+ if choice.value == Menu.yes():
+ data = [part for part in data if part.is_exists_or_modify()]
+ case 'assign_mountpoint' if entry:
+ entry.mountpoint = self._prompt_mountpoint()
+ if entry.mountpoint == Path('/boot'):
+ entry.set_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ entry.set_flag(PartitionFlag.ESP)
+ case 'mark_formatting' if entry:
+ self._prompt_formatting(entry)
+ case 'mark_bootable' if entry:
+ entry.invert_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ entry.invert_flag(PartitionFlag.ESP)
+ case 'set_filesystem' if entry:
+ fs_type = self._prompt_partition_fs_type()
+ if fs_type:
+ entry.fs_type = fs_type
+ # btrfs subvolumes will define mountpoints
+ if fs_type == FilesystemType.Btrfs:
+ entry.mountpoint = None
+ case 'btrfs_mark_compressed' if entry:
+ self._toggle_mount_option(entry, BtrfsMountOption.compress)
+ case 'btrfs_mark_nodatacow' if entry:
+ self._toggle_mount_option(entry, BtrfsMountOption.nodatacow)
+ case 'btrfs_set_subvolumes' if entry:
+ self._set_btrfs_subvolumes(entry)
+ case 'delete_partition' if entry:
+ data = self._delete_partition(entry, data)
+
+ return data
+
+ def _delete_partition(
+ self,
+ entry: PartitionModification,
+ data: List[PartitionModification]
+ ) -> List[PartitionModification]:
+ if entry.is_exists_or_modify():
+ entry.status = ModificationStatus.Delete
+ return data
+ else:
+ return [d for d in data if d != entry]
+
+ def _toggle_mount_option(
+ self,
+ partition: PartitionModification,
+ option: BtrfsMountOption
+ ):
+ if option.value not in partition.mount_options:
+ if option == BtrfsMountOption.compress:
+ partition.mount_options = [
+ o for o in partition.mount_options
+ if o != BtrfsMountOption.nodatacow.value
+ ]
+
+ partition.mount_options = [
+ o for o in partition.mount_options
+ if not o.startswith(BtrfsMountOption.compress.name)
+ ]
+
+ partition.mount_options.append(option.value)
+ else:
+ partition.mount_options = [
+ o for o in partition.mount_options if o != option.value
+ ]
+
+ def _set_btrfs_subvolumes(self, partition: PartitionModification):
+ partition.btrfs_subvols = SubvolumeMenu(
+ _("Manage btrfs subvolumes for current partition"),
+ partition.btrfs_subvols
+ ).run()
+
+ def _prompt_formatting(self, partition: PartitionModification):
+ # an existing partition can toggle between Exist or Modify
+ if partition.is_modify():
+ partition.status = ModificationStatus.Exist
+ return
+ elif partition.exists():
+ partition.status = ModificationStatus.Modify
+
+ # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
+ # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
+ # it's safe to change the filesystem for this partition.
+ if partition.fs_type == FilesystemType.Crypto_luks:
+ prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified'))
+ fs_type = self._prompt_partition_fs_type(prompt)
+ partition.fs_type = fs_type
+
+ if fs_type == FilesystemType.Btrfs:
+ partition.mountpoint = None
+
+ def _prompt_mountpoint(self) -> Path:
+ header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n'
+ header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n'
+ prompt = str(_('Mountpoint: '))
+
+ print(header)
+
+ while True:
+ value = TextInput(prompt).run().strip()
+
+ if value:
+ mountpoint = Path(value)
+ break
+
+ return mountpoint
+
+ def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType:
+ options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks}
+
+ prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition'))
+ choice = Menu(prompt, options, sort=False, skip=False).run()
+ return options[choice.single_value]
+
+ def _validate_value(
+ self,
+ sector_size: SectorSize,
+ total_size: Size,
+ text: str,
+ start: Optional[Size]
+ ) -> Optional[Size]:
+ match = re.match(r'([0-9]+)([a-zA-Z|%]*)', text, re.I)
+
+ if match:
+ str_value, unit = match.groups()
+
+ if unit == '%' and start:
+ available = total_size - start
+ value = int(available.value * (int(str_value) / 100))
+ unit = available.unit.name
+ else:
+ value = int(str_value)
+
+ if unit and unit not in Unit.get_all_units():
+ return None
+
+ unit = Unit[unit] if unit else Unit.sectors
+ return Size(value, unit, sector_size)
+
+ return None
+
+ def _enter_size(
+ self,
+ sector_size: SectorSize,
+ total_size: Size,
+ prompt: str,
+ default: Size,
+ start: Optional[Size],
+ ) -> Size:
+ while True:
+ value = TextInput(prompt).run().strip()
+ size: Optional[Size] = None
+ if not value:
+ size = default
+ else:
+ size = self._validate_value(sector_size, total_size, value, start)
+
+ if size:
+ return size
+
+ warn(f'Invalid value: {value}')
+
+ def _prompt_size(self) -> Tuple[Size, Size]:
+ device_info = self._device.device_info
+
+ text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n'
+ free_space_table = FormattedOutput.as_table(device_info.free_space_regions)
+ prompt = text + free_space_table + '\n'
+
+ total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size)
+ total_bytes = device_info.total_size.format_size(Unit.B)
+
+ prompt += str(_('Total: {} / {}')).format(total_sectors, total_bytes) + '\n\n'
+ prompt += str(_('All entered values can be suffixed with a unit: %, B, KB, KiB, MB, MiB...')) + '\n'
+ prompt += str(_('If no unit is provided, the value is interpreted as sectors')) + '\n'
+ print(prompt)
+
+ default_free_sector = self._find_default_free_space()
+
+ if not default_free_sector:
+ default_free_sector = DefaultFreeSector(
+ Size(0, Unit.sectors, self._device.device_info.sector_size),
+ Size(0, Unit.sectors, self._device.device_info.sector_size)
+ )
+
+ # prompt until a valid start sector was entered
+ start_prompt = str(_('Enter start (default: sector {}): ')).format(default_free_sector.start.value)
+
+ start_size = self._enter_size(
+ device_info.sector_size,
+ device_info.total_size,
+ start_prompt,
+ default_free_sector.start,
+ None
+ )
+
+ if start_size.value == default_free_sector.start.value and default_free_sector.end.value != 0:
+ end_size = default_free_sector.end
+ else:
+ end_size = device_info.total_size
+
+ # prompt until valid end sector was entered
+ end_prompt = str(_('Enter end (default: {}): ')).format(end_size.as_text())
+ end_size = self._enter_size(
+ device_info.sector_size,
+ device_info.total_size,
+ end_prompt,
+ end_size,
+ start_size
+ )
+
+ return start_size, end_size
+
+ def _find_default_free_space(self) -> Optional[DefaultFreeSector]:
+ device_info = self._device.device_info
+
+ largest_free_area: Optional[DeviceGeometry] = None
+ largest_deleted_area: Optional[PartitionModification] = None
+
+ if len(device_info.free_space_regions) > 0:
+ largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length())
+
+ deleted_partitions = list(filter(lambda x: x.status == ModificationStatus.Delete, self._data))
+ if len(deleted_partitions) > 0:
+ largest_deleted_area = max(deleted_partitions, key=lambda p: p.length)
+
+ def _free_space(space: DeviceGeometry) -> DefaultFreeSector:
+ start = Size(space.start, Unit.sectors, device_info.sector_size)
+ end = Size(space.end, Unit.sectors, device_info.sector_size)
+ return DefaultFreeSector(start, end)
+
+ def _free_deleted(space: PartitionModification) -> DefaultFreeSector:
+ start = space.start.convert(Unit.sectors, self._device.device_info.sector_size)
+ end = space.end.convert(Unit.sectors, self._device.device_info.sector_size)
+ return DefaultFreeSector(start, end)
+
+ if not largest_deleted_area and largest_free_area:
+ return _free_space(largest_free_area)
+ elif not largest_free_area and largest_deleted_area:
+ return _free_deleted(largest_deleted_area)
+ elif not largest_deleted_area and not largest_free_area:
+ return None
+ elif largest_free_area and largest_deleted_area:
+ free_space = _free_space(largest_free_area)
+ if free_space.start > largest_deleted_area.start:
+ return free_space
+ else:
+ return _free_deleted(largest_deleted_area)
+
+ return None
+
+ def _create_new_partition(self) -> PartitionModification:
+ fs_type = self._prompt_partition_fs_type()
+
+ start_size, end_size = self._prompt_size()
+ length = end_size - start_size
+
+ # new line for the next prompt
+ print()
+
+ mountpoint = None
+ if fs_type != FilesystemType.Btrfs:
+ mountpoint = self._prompt_mountpoint()
+
+ partition = PartitionModification(
+ status=ModificationStatus.Create,
+ type=PartitionType.Primary,
+ start=start_size,
+ length=length,
+ fs_type=fs_type,
+ mountpoint=mountpoint
+ )
+
+ if partition.mountpoint == Path('/boot'):
+ partition.set_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ partition.set_flag(PartitionFlag.ESP)
+
+ return partition
+
+ def _reset_confirmation(self) -> MenuSelection:
+ prompt = str(_('This will remove all newly added partitions, continue?'))
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
+ return choice
+
+ def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]:
+ # if modifications have been done already, inform the user
+ # that this operation will erase those modifications
+ if any([not entry.exists() for entry in data]):
+ choice = self._reset_confirmation()
+ if choice.value == Menu.no():
+ return []
+
+ from ..interactions.disk_conf import suggest_single_disk_layout
+
+ device_modification = suggest_single_disk_layout(self._device)
+ return device_modification.partitions
+
+
+def manual_partitioning(
+ device: BDevice,
+ prompt: str = '',
+ preset: List[PartitionModification] = []
+) -> List[PartitionModification]:
+ if not prompt:
+ prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n'
+ prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB))
+
+ manual_preset = []
+
+ if not preset:
+ # we'll display the existing partitions of the device
+ for partition in device.partition_infos:
+ manual_preset.append(
+ PartitionModification.from_existing_partition(partition)
+ )
+ else:
+ manual_preset = preset
+
+ menu_list = PartitioningList(prompt, device, manual_preset)
+ partitions: List[PartitionModification] = menu_list.run()
+
+ if menu_list.is_last_choice_cancel():
+ return preset
+
+ return partitions