From 9194f6d85965f435f8d0ae44ba20e73cc761eb44 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Tue, 26 Jul 2022 18:46:50 +1000 Subject: Cleanup partition (#1333) * Cleanup partition * Update * Remove unused method * Update partitioning * Update * Update * Fix mypy Co-authored-by: Daniel Girtler --- archinstall/lib/disk/blockdevice.py | 10 +- archinstall/lib/disk/btrfs/btrfspartition.py | 19 +- archinstall/lib/disk/filesystem.py | 26 +- archinstall/lib/disk/helpers.py | 10 +- archinstall/lib/disk/partition.py | 429 +++++++++------------ archinstall/lib/luks.py | 6 +- .../lib/user_interaction/partitioning_conf.py | 10 - 7 files changed, 227 insertions(+), 283 deletions(-) diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 4e207bf4..736bacbc 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -88,9 +88,6 @@ class BlockDevice: raise KeyError(f'{self.info} does not contain information: "{key}"') - def __len__(self) -> int: - return len(self.partitions) - def __lt__(self, left_comparitor :'BlockDevice') -> bool: return self._path < left_comparitor.path @@ -121,6 +118,8 @@ class BlockDevice: def _load_partitions(self): from .partition import Partition + self._partitions.clear() + lsblk_info = self._call_lsblk(self._path) device = lsblk_info['blockdevices'][0] self._partitions.clear() @@ -233,8 +232,6 @@ class BlockDevice: @property def partitions(self) -> Dict[str, 'Partition']: - self._partprobe() - self._load_partitions() return OrderedDict(sorted(self._partitions.items())) @property @@ -282,7 +279,7 @@ class BlockDevice: try: if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): return partition - elif partuuid and partition.part_uuid.lower() == partuuid.lower(): + elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower(): return partition except DiskError as error: # Most likely a blockdevice that doesn't support or use UUID's @@ -291,6 +288,7 @@ class BlockDevice: pass log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) + self.flush_cache() time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py index a05f1527..d04c9b98 100644 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -17,22 +17,11 @@ if TYPE_CHECKING: from ...installer import Installer from .btrfssubvolumeinfo import BtrfsSubvolumeInfo + class BTRFSPartition(Partition): def __init__(self, *args, **kwargs): Partition.__init__(self, *args, **kwargs) - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" - - if self._encrypted: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' - else: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - @property def subvolumes(self): for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): @@ -40,11 +29,11 @@ class BTRFSPartition(Partition): yield subvolume_info_from_path(filesystem['target']) def iterate_children(struct): - for child in struct.get('children', []): + for c in struct.get('children', []): if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) + yield subvolume_info_from_path(c['target']) - for sub_child in iterate_children(child): + for sub_child in iterate_children(c): yield sub_child for child in iterate_children(filesystem): diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 1c7a801b..90656308 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -210,7 +210,14 @@ class Filesystem: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition: + def add_partition( + self, + partition_type :str, + start :str, + end :str, + partition_format :Optional[str] = None, + skip_mklabel :bool = False + ) -> Partition: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: @@ -232,6 +239,7 @@ class Filesystem: except DiskError: pass + # TODO this check should probably run in the setup process rather than during the installation if self.mode == MBR: if len(self.blockdevice.partitions) > 3: DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") @@ -246,14 +254,9 @@ class Filesystem: if self.parted(parted_string): for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): self.partprobe() + self.blockdevice.flush_cache() - new_partition_uuids = [] - for partition in self.blockdevice.partitions.values(): - try: - new_partition_uuids.append(partition.part_uuid) - except DiskError: - pass - + new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): @@ -263,17 +266,20 @@ class Filesystem: log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red") + log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") raise err else: log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) + total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) + total_partitions.update(previous_partuuids) + # TODO: This should never be able to happen log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red") + log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") raise DiskError(f"Could not add partition using: {parted_string}") def set_name(self, partition: int, name: str) -> bool: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 660594ed..c8ac564e 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -370,7 +370,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict return filters -def get_partitions_in_use(mountpoint :str) -> List[Partition]: +def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: from .partition import Partition try: @@ -393,8 +393,12 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]: if not type(blockdev) in (Partition, MapperDev): continue - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev + if isinstance(blockdev, Partition): + for blockdev_mountpoint in blockdev.mountpoints: + block_devices_mountpoints[blockdev_mountpoint] = blockdev + else: + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 17c24d57..4028f114 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -1,14 +1,16 @@ import glob -import pathlib import time import logging import json import os import hashlib +import typing +from dataclasses import dataclass +from pathlib import Path from typing import Optional, Dict, Any, List, Union, Iterator from .blockdevice import BlockDevice -from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name +from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log @@ -16,6 +18,26 @@ from ..general import SysCommand from .btrfs.btrfs_helpers import subvolume_info_from_path from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo + +@dataclass +class PartitionInfo: + pttype: str + partuuid: str + uuid: str + start: Optional[int] + end: Optional[int] + bootable: bool + size: float + sector_size: int + filesystem_type: str + mountpoints: List[Path] + + def get_first_mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None + + class Partition: def __init__( self, @@ -25,38 +47,37 @@ class Partition: filesystem :Optional[str] = None, mountpoint :Optional[str] = None, encrypted :bool = False, - autodetect_filesystem :bool = True + autodetect_filesystem :bool = True, ): - if not part_id: part_id = os.path.basename(path) - self.block_device = block_device - if type(self.block_device) is str: + if type(block_device) is str: raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - self.path = path - self.part_id = part_id - self.target_mountpoint = mountpoint - self.filesystem = filesystem + self.block_device = block_device + self._path = path + self._part_id = part_id + self._target_mountpoint = mountpoint self._encrypted = None - self.encrypted = encrypted - self.allow_formatting = False + self._encrypted = encrypted + self._wipe = False + self._type = 'primary' if mountpoint: self.mount(mountpoint) - try: - self.mount_information = list(find_mountpoint(self.path)) - except DiskError: - self.mount_information = [{}] + self._partition_info = self._fetch_information() - if not self.filesystem and autodetect_filesystem: - self.filesystem = get_filesystem_type(path) + if not autodetect_filesystem and filesystem: + self._partition_info.filesystem_type = filesystem - if self.filesystem == 'crypto_LUKS': - self.encrypted = True + if self._partition_info.filesystem_type == 'crypto_LUKS': + self._encrypted = True + # I hate doint this but I'm currently unsure where this + # is acutally used to be able to fix the typing issues properly + @typing.no_type_check def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path @@ -64,254 +85,191 @@ class Partition: left_comparitor = str(left_comparitor) # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self.path < left_comparitor + return self._path < left_comparitor def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" + if mountpoint := self._partition_info.get_first_mountpoint(): + mount_repr = f", mounted={mountpoint}" + elif self._target_mountpoint: + mount_repr = f", rel_mountpoint={self._target_mountpoint}" + + classname = self.__class__.__name__ if self._encrypted: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' else: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' def as_json(self) -> Dict[str, Any]: """ this is used for the table representation of the partition (see FormattedOutput) """ partition_info = { - 'type': 'primary', - 'PARTUUID': self._safe_uuid, - 'wipe': self.allow_formatting, + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, 'boot': self.boot, 'ESP': self.boot, - 'mountpoint': self.target_mountpoint, + 'mountpoint': self._target_mountpoint, 'encrypted': self._encrypted, 'start': self.start, 'size': self.end, - 'filesystem': self.filesystem_type + 'filesystem': self._partition_info.filesystem_type } return partition_info def __dump__(self) -> Dict[str, Any]: # TODO remove this in favour of as_json - - log(get_filesystem_type(self.path)) - return { - 'type': 'primary', - 'PARTUUID': self._safe_uuid, - 'wipe': self.allow_formatting, + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, 'boot': self.boot, 'ESP': self.boot, - 'mountpoint': self.target_mountpoint, + 'mountpoint': self._target_mountpoint, 'encrypted': self._encrypted, 'start': self.start, 'size': self.end, 'filesystem': { - 'format': self.filesystem_type + 'format': self._partition_info.filesystem_type } } - @property - def filesystem_type(self) -> Optional[str]: - return get_filesystem_type(self.path) + def _call_lsblk(self) -> Dict[str, Any]: + self.partprobe() + output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - @property - def mountpoint(self) -> Optional[str]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) - for filesystem in data['filesystems']: - return pathlib.Path(filesystem.get('target')) + if output: + lsblk_info = json.loads(output) + return lsblk_info - except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey") - pass - - return None + raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk') - @property - def sector_size(self) -> Optional[int]: - output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) + def _call_sfdisk(self) -> Dict[str, Any]: + output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - for device in output['blockdevices']: - return device.get('log-sec', None) + if output: + sfdisk_info = json.loads(output) + partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) + node = list(filter(lambda x: x['node'] == self._path, partitions)) - @property - def start(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + if len(node) > 0: + return node[0] - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] # * self.sector_size + return {} - @property - def end(self) -> Optional[str]: - # TODO: actually this is size in sectors unit - # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['size'] # * self.sector_size + def _fetch_information(self) -> PartitionInfo: + lsblk_info = self._call_lsblk() + sfdisk_info = self._call_sfdisk() + device = lsblk_info['blockdevices'][0] - @property - def end_sectors(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] + bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] + partition['size'] + return PartitionInfo( + pttype=device['pttype'], + partuuid=device['partuuid'], + uuid=device['uuid'], + sector_size=device['log-sec'], + size=convert_size_to_gb(device['size']), + start=sfdisk_info.get('start', None), + end=sfdisk_info.get('size', None), + bootable=bootable, + filesystem_type=device['fstype'], + mountpoints=mountpoints + ) @property - def size(self) -> Optional[float]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - try: - lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode()) - - for device in lsblk['blockdevices']: - return convert_size_to_gb(device['size']) - except SysCallError as error: - if error.exit_code == 8192: - return None - else: - raise error + def target_mountpoint(self) -> Optional[str]: + return self._target_mountpoint @property - def boot(self) -> bool: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) - - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - # first condition is for MBR disks, second for GPT disks - return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - - return False + def path(self) -> str: + return self._path @property - def partition_type(self) -> Optional[str]: - lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) - - for device in lsblk['blockdevices']: - return device['pttype'] + def filesystem(self) -> str: + return self._partition_info.filesystem_type @property - def part_uuid(self) -> str: - """ - Returns the PARTUUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - partuuid = self._safe_part_uuid - if partuuid: - return partuuid - - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") + def mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None @property - def uuid(self) -> Optional[str]: - """ - Returns the UUID as returned by lsblk for the **partition**. - This is more reliable than relying on /dev/disk/by-uuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") + def mountpoints(self) -> List[Path]: + return self._partition_info.mountpoints - time.sleep(storage.get('DISK_TIMEOUTS', 1) * i) - - partuuid = self._safe_uuid - if partuuid: - return partuuid - - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") + @property + def sector_size(self) -> int: + return self._partition_info.sector_size @property - def _safe_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None + def start(self) -> Optional[int]: + return self._partition_info.start - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) + @property + def end(self) -> Optional[int]: + return self._partition_info.end - try: - return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None + @property + def end_sectors(self) -> Optional[int]: + start = self._partition_info.start + end = self._partition_info.end + if start and end: + return start + end + return None - log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}") + @property + def size(self) -> Optional[float]: + return self._partition_info.size @property - def _safe_part_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None + def boot(self) -> bool: + return self._partition_info.bootable - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) + @property + def partition_type(self) -> Optional[str]: + return self._partition_info.pttype - try: - return self.block_device.uuid - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None + @property + def part_uuid(self) -> str: + return self._partition_info.partuuid - log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") + @property + def uuid(self) -> Optional[str]: + return self._partition_info.uuid @property def encrypted(self) -> Union[bool, None]: return self._encrypted - @encrypted.setter - def encrypted(self, value: bool) -> None: - self._encrypted = value - @property def parent(self) -> str: return self.real_device @property def real_device(self) -> str: - for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') - return self.path + output = SysCommand('lsblk -J').decode('UTF-8') + + if output: + for blockdevice in json.loads(output)['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): + return f"/dev/{parent}" + return self._path + + raise DiskError('Unable to get disk information for command "lsblk -J"') @property def device_path(self) -> str: - """ for bind mounts returns the phisical path of the partition + """ for bind mounts returns the physical path of the partition """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return device_path @property @@ -319,7 +277,7 @@ class Partition: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return bind_name @property @@ -330,29 +288,29 @@ class Partition: for child in information.get('children', []): if target := child.get('target'): if child.get('fstype') == 'btrfs': - if subvolume := subvolume_info_from_path(pathlib.Path(target)): + if subvolume := subvolume_info_from_path(Path(target)): yield subvolume if child.get('children'): for subchild in iterate_children_recursively(child): yield subchild - for mountpoint in self.mount_information: - if result := findmnt(pathlib.Path(mountpoint['target'])): - for filesystem in result.get('filesystems', []): - if mountpoint.get('fstype') == 'btrfs': - if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])): + if self._partition_info.filesystem_type == 'btrfs': + for mountpoint in self._partition_info.mountpoints: + if result := findmnt(mountpoint): + for filesystem in result.get('filesystems', []): + if subvolume := subvolume_info_from_path(mountpoint): yield subvolume - for child in iterate_children_recursively(filesystem): - yield child + for child in iterate_children_recursively(filesystem): + yield child def partprobe(self) -> bool: try: if self.block_device: return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code except SysCallError as error: - log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG) + log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) return False @@ -364,19 +322,20 @@ class Partition: with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: return unlocked_device.filesystem except SysCallError: - return None + pass + return None def has_content(self) -> bool: - fs_type = get_filesystem_type(self.path) + fs_type = self._partition_info.filesystem_type if not fs_type or "swap" in fs_type: return False temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = pathlib.Path(temporary_mountpoint) + temporary_path = Path(temporary_mountpoint) temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') + if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: + raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') files = len(glob.glob(f"{temporary_mountpoint}/*")) iterations = 0 @@ -387,14 +346,14 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args :str, **kwargs :str) -> str: + def encrypt(self, password: Optional[str] = None) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ from ..luks import luks2 handle = luks2(self, None, None) - return handle.encrypt(self, *args, **kwargs) + return handle.encrypt(self, password=password) def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: """ @@ -402,17 +361,17 @@ class Partition: the formatting functionality and in essence the support for the given filesystem. """ if filesystem is None: - filesystem = self.filesystem + filesystem = self._partition_info.filesystem_type if path is None: - path = self.path + path = self._path # This converts from fat32 -> vfat to unify filesystem names filesystem = get_mount_fs_type(filesystem) # To avoid "unable to open /dev/x: No such file or directory" start_wait = time.time() - while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: + while Path(path).exists() is False and time.time() - start_wait < 10: time.sleep(0.025) if log_formatting: @@ -422,57 +381,57 @@ class Partition: if filesystem == 'btrfs': options = ['-f'] + options - if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): + mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') + if mkfs and 'UUID:' not in mkfs: raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'vfat': options = ['-F32'] + options log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext4': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext2': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') - self.filesystem = 'ext2' - + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self._partition_info.filesystem_type = 'ext2' elif filesystem == 'xfs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'f2fs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ntfs3': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'crypto_LUKS': # from ..luks import luks2 # encrypted_partition = luks2(self, None, None) # encrypted_partition.format(path) - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem else: raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") @@ -485,9 +444,9 @@ class Partition: return self.format(filesystem, path, log_formatting, options, retry=False) if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self.encrypted = True + self._encrypted = True else: - self.encrypted = False + self._encrypted = False return True @@ -499,18 +458,18 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent + return None + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self.mountpoint: + if not self._partition_info.get_first_mountpoint(): log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: - if not self.filesystem: - raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') - fs = self.filesystem + fs = self._partition_info.filesystem_type fs_type = get_mount_fs_type(fs) - pathlib.Path(target).mkdir(parents=True, exist_ok=True) + Path(target).mkdir(parents=True, exist_ok=True) if self.bind_name: device_path = self.device_path @@ -520,7 +479,7 @@ class Partition: else: options = f"subvol={self.bind_name}" else: - device_path = self.path + device_path = self._path try: if options: mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") @@ -529,7 +488,7 @@ class Partition: # TODO: Should be redundant to check for exit_code if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self.path} to {target} using options {options}") + raise DiskError(f"Could not mount {self._path} to {target} using options {options}") except SysCallError as err: raise err @@ -538,19 +497,17 @@ class Partition: return False def unmount(self) -> bool: - worker = SysCommand(f"/usr/bin/umount {self.path}") + worker = SysCommand(f"/usr/bin/umount {self._path}") + exit_code = worker.exit_code # Without to much research, it seams that low error codes are errors. # And above 8k is indicators such as "/dev/x not mounted.". # So anything in between 0 and 8k are errors (?). - if 0 < worker.exit_code < 8000: - raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) + if exit_code and 0 < exit_code < 8000: + raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code) return True - def umount(self) -> bool: - return self.unmount() - def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling @@ -559,7 +516,7 @@ class Partition: 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type """ try: - self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) + self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) except (SysCallError, DiskError): pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code except UnknownFilesystemFormat as err: diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index ac480b11..7e4534d8 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -22,9 +22,9 @@ from .disk.btrfs import BTRFSPartition class luks2: def __init__(self, - partition :Partition, - mountpoint :str, - password :str, + partition: Partition, + mountpoint: Optional[str], + password: Optional[str], key_file :Optional[str] = None, auto_unmount :bool = False, *args :str, diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py index 8bf76121..f2e6b881 100644 --- a/archinstall/lib/user_interaction/partitioning_conf.py +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -140,16 +140,6 @@ def get_default_partition_layout( return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) -def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: - result = {} - - for device in block_devices: - layout = manage_new_and_existing_partitions(device) - result[device.path] = layout - - return result - - def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50 block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} original_layout = copy.deepcopy(block_device_struct) -- cgit v1.2.3-70-g09d2