index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk/partition.py | 661 |
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py deleted file mode 100644 index 87eaa6a7..00000000 --- a/archinstall/lib/disk/partition.py +++ /dev/null @@ -1,661 +0,0 @@ -import glob -import time -import logging -import json -import os -import hashlib -import typing -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional, Dict, Any, List, Union, Iterator - -from .blockdevice import BlockDevice -from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name -from ..storage import storage -from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat -from ..output import log -from ..general import SysCommand -from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo - -@dataclass -class PartitionInfo: - partition_object: 'Partition' - device_path: str # This would be /dev/sda1 for instance - bootable: bool - size: float - sector_size: int - start: Optional[int] - end: Optional[int] - pttype: Optional[str] - filesystem_type: Optional[str] - partuuid: Optional[str] - uuid: Optional[str] - mountpoints: List[Path] = field(default_factory=list) - - def __post_init__(self): - if not all([self.partuuid, self.uuid]): - for i in range(storage['DISK_RETRY_ATTEMPTS']): - lsblk_info = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - try: - lsblk_info = json.loads(lsblk_info) - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {lsblk_info}", fg="red", level=logging.ERROR) - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - if not (device := lsblk_info.get('blockdevices', [None])[0]): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - self.partuuid = device.get('partuuid') - self.uuid = device.get('uuid') - - # Lets build a list of requirements that we would like - # to retry and build (stuff that can take time between partprobes) - requirements = [] - requirements.append(self.partuuid) - - # Unformatted partitions won't have a UUID - if lsblk_info.get('fstype') is not None: - requirements.append(self.uuid) - - if all(requirements): - break - - self.partition_object.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) - - def get_first_mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - -class Partition: - def __init__( - self, - path: str, - block_device: BlockDevice, - part_id :Optional[str] = None, - filesystem :Optional[str] = None, - mountpoint :Optional[str] = None, - encrypted :bool = False, - autodetect_filesystem :bool = True, - ): - if not part_id: - part_id = os.path.basename(path) - - if type(block_device) is str: - raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - - self.block_device = block_device - self._path = path - self._part_id = part_id - self._target_mountpoint = mountpoint - self._encrypted = encrypted - self._wipe = False - self._type = 'primary' - - if mountpoint: - self.mount(mountpoint) - - try: - self._partition_info = self._fetch_information() - - if not autodetect_filesystem and filesystem: - self._partition_info.filesystem_type = filesystem - - if self._partition_info.filesystem_type == 'crypto_LUKS': - self._encrypted = True - except DiskError: - self._partition_info = None - - @typing.no_type_check # I hate doint this but I'm currently unsure where this is used. - def __lt__(self, left_comparitor :BlockDevice) -> bool: - if type(left_comparitor) == Partition: - left_comparitor = left_comparitor.path - else: - left_comparitor = str(left_comparitor) - - # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self._path < left_comparitor - - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self._partition_info: - if mountpoint := self._partition_info.get_first_mountpoint(): - mount_repr = f", mounted={mountpoint}" - elif self._target_mountpoint: - mount_repr = f", rel_mountpoint={self._target_mountpoint}" - - classname = self.__class__.__name__ - - if not self._partition_info: - return f'{classname}(path={self._path})' - elif self._encrypted: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' - else: - return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' - - def as_json(self) -> Dict[str, Any]: - """ - this is used for the table representation of the partition (see FormattedOutput) - """ - partition_info = { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown' - } - - return partition_info - - def __dump__(self) -> Dict[str, Any]: - # TODO remove this in favour of as_json - return { - 'type': self._type, - 'PARTUUID': self.part_uuid, - 'wipe': self._wipe, - 'boot': self.boot, - 'ESP': self.boot, - 'mountpoint': self._target_mountpoint, - 'encrypted': self._encrypted, - 'start': self.start, - 'size': self.end, - 'filesystem': { - 'format': self._partition_info.filesystem_type if self._partition_info else 'None' - } - } - - def _call_lsblk(self) -> Dict[str, Any]: - for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk - # This sleep might be overkill, but lsblk is known to - # work against a chaotic cache that can change during call - # causing no information to be returned (blkid is better) - # time.sleep(1) - - # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) - - try: - output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') - except SysCallError as error: - # Get the output minus the message/info from lsblk if it returns a non-zero exit code. - output = error.worker.decode('UTF-8') - if '{' in output: - output = output[output.find('{'):] - - if output: - try: - lsblk_info = json.loads(output) - return lsblk_info - except json.decoder.JSONDecodeError: - log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR) - - raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk') - - def _call_sfdisk(self) -> Dict[str, Any]: - output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - - if output: - sfdisk_info = json.loads(output) - partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) - node = list(filter(lambda x: x['node'] == self._path, partitions)) - - if len(node) > 0: - return node[0] - - return {} - - raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') - - def _fetch_information(self) -> PartitionInfo: - lsblk_info = self._call_lsblk() - sfdisk_info = self._call_sfdisk() - - if not (device := lsblk_info.get('blockdevices', [])): - raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - - # Grab the first (and only) block device in the list as we're targeting a specific partition - device = device[0] - - mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] - bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' - - return PartitionInfo( - partition_object=self, - device_path=self._path, - pttype=device['pttype'], - partuuid=device['partuuid'], - uuid=device['uuid'], - sector_size=device['log-sec'], - size=convert_size_to_gb(device['size']), - start=sfdisk_info.get('start', None), - end=sfdisk_info.get('size', None), - bootable=bootable, - filesystem_type=device['fstype'], - mountpoints=mountpoints - ) - - @property - def target_mountpoint(self) -> Optional[str]: - return self._target_mountpoint - - @property - def path(self) -> str: - return self._path - - @property - def filesystem(self) -> str: - if self._partition_info: - return self._partition_info.filesystem_type - - @property - def mountpoint(self) -> Optional[Path]: - if len(self.mountpoints) > 0: - return self.mountpoints[0] - return None - - @property - def mountpoints(self) -> List[Path]: - if self._partition_info: - return self._partition_info.mountpoints - - @property - def sector_size(self) -> int: - if self._partition_info: - return self._partition_info.sector_size - - @property - def start(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.start - - @property - def end(self) -> Optional[int]: - if self._partition_info: - return self._partition_info.end - - @property - def end_sectors(self) -> Optional[int]: - if self._partition_info: - start = self._partition_info.start - end = self._partition_info.end - if start and end: - return start + end - - @property - def size(self) -> Optional[float]: - if self._partition_info: - return self._partition_info.size - - @property - def boot(self) -> bool: - if self._partition_info: - return self._partition_info.bootable - - @property - def partition_type(self) -> Optional[str]: - if self._partition_info: - return self._partition_info.pttype - - @property - def part_uuid(self) -> str: - if self._partition_info: - return self._partition_info.partuuid - - @property - def uuid(self) -> Optional[str]: - """ - Returns the UUID as returned by lsblk for the **partition**. - This is more reliable than relying on /dev/disk/by-uuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(storage.get('DISK_TIMEOUTS', 1) * i) - - partuuid = self._safe_uuid - if partuuid: - return partuuid - - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") - - @property - def _safe_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}") - - @property - def _safe_part_uuid(self) -> Optional[str]: - """ - A near copy of self.uuid but without any delays. - This function should only be used where uuid is not crucial. - For instance when you want to get a __repr__ of the class. - """ - if not self.partprobe(): - if self.block_device.partition_type == 'iso9660': - return None - - log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) - - try: - return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() - except SysCallError as error: - if self.block_device.partition_type == 'iso9660': - # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) - return None - - log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") - - if self._partition_info: - return self._partition_info.uuid - - @property - def encrypted(self) -> Union[bool, None]: - return self._encrypted - - @property - def parent(self) -> str: - return self.real_device - - @property - def real_device(self) -> str: - output = SysCommand('lsblk -J').decode('UTF-8') - - if output: - for blockdevice in json.loads(output)['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - return self._path - - raise DiskError('Unable to get disk information for command "lsblk -J"') - - @property - def device_path(self) -> str: - """ for bind mounts returns the physical path of the partition - """ - device_path, bind_name = split_bind_name(self._path) - return device_path - - @property - def bind_name(self) -> str: - """ for bind mounts returns the bind name (subvolume path). - Returns none if this property does not exist - """ - device_path, bind_name = split_bind_name(self._path) - return bind_name - - @property - def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: - from .helpers import findmnt - - def iterate_children_recursively(information): - for child in information.get('children', []): - if target := child.get('target'): - if child.get('fstype') == 'btrfs': - if subvolume := subvolume_info_from_path(Path(target)): - yield subvolume - - if child.get('children'): - for subchild in iterate_children_recursively(child): - yield subchild - - if self._partition_info.filesystem_type == 'btrfs': - for mountpoint in self._partition_info.mountpoints: - if result := findmnt(mountpoint): - for filesystem in result.get('filesystems', []): - if subvolume := subvolume_info_from_path(mountpoint): - yield subvolume - - for child in iterate_children_recursively(filesystem): - yield child - - def partprobe(self) -> bool: - try: - if self.block_device: - return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code - except SysCallError as error: - log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) - - return False - - def detect_inner_filesystem(self, password :str) -> Optional[str]: - log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) - from ..luks import luks2 - - try: - with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: - return unlocked_device.filesystem - except SysCallError: - pass - return None - - def has_content(self) -> bool: - fs_type = self._partition_info.filesystem_type - if not fs_type or "swap" in fs_type: - return False - - temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = Path(temporary_mountpoint) - - temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') - - files = len(glob.glob(f"{temporary_mountpoint}/*")) - iterations = 0 - while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10: - time.sleep(1) - - temporary_path.rmdir() - - return True if files > 0 else False - - def encrypt(self, password: Optional[str] = None) -> str: - """ - A wrapper function for luks2() instances and the .encrypt() method of that instance. - """ - from ..luks import luks2 - - handle = luks2(self, None, None) - return handle.encrypt(self, password=password) - - def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: - """ - Format can be given an overriding path, for instance /dev/null to test - the formatting functionality and in essence the support for the given filesystem. - """ - if filesystem is None: - filesystem = self._partition_info.filesystem_type - - if path is None: - path = self._path - - # This converts from fat32 -> vfat to unify filesystem names - filesystem = get_mount_fs_type(filesystem) - - # To avoid "unable to open /dev/x: No such file or directory" - start_wait = time.time() - while Path(path).exists() is False and time.time() - start_wait < 10: - time.sleep(0.025) - - if log_formatting: - log(f'Formatting {path} -> {filesystem}', level=logging.INFO) - - try: - if filesystem == 'btrfs': - options = ['-f'] + options - - mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') - if mkfs and 'UUID:' not in mkfs: - raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'vfat': - options = ['-F32'] + options - log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") - if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext4': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ext2': - options = ['-F'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = 'ext2' - elif filesystem == 'xfs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'f2fs': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'ntfs3': - options = ['-f'] + options - - if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self._partition_info.filesystem_type = filesystem - - elif filesystem == 'crypto_LUKS': - # from ..luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) - self._partition_info.filesystem_type = filesystem - - else: - raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") - except SysCallError as error: - log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange") - if retry is True: - log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") - time.sleep(storage.get('DISK_TIMEOUTS', 1)) - - return self.format(filesystem, path, log_formatting, options, retry=False) - - if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self._encrypted = True - else: - self._encrypted = False - - return True - - def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: - if data['name'] == name: - return parent - elif 'children' in data: - for child in data['children']: - if parent := self.find_parent_of(child, name, parent=data['name']): - return parent - - return None - - def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self._partition_info.get_first_mountpoint(): - log(f'Mounting {self} to {target}', level=logging.INFO) - - if not fs: - fs = self._partition_info.filesystem_type - - fs_type = get_mount_fs_type(fs) - - Path(target).mkdir(parents=True, exist_ok=True) - - if self.bind_name: - device_path = self.device_path - # TODO options should be better be a list than a string - if options: - options = f"{options},subvol={self.bind_name}" - else: - options = f"subvol={self.bind_name}" - else: - device_path = self._path - try: - if options: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") - else: - mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}") - - # TODO: Should be redundant to check for exit_code - if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self._path} to {target} using options {options}") - except SysCallError as err: - raise err - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - return False - - def unmount(self) -> bool: - SysCommand(f"/usr/bin/umount {self._path}") - - # Update the partition info since the mount info has changed after this call. - self._partition_info = self._fetch_information() - return True - - def filesystem_supported(self) -> bool: - """ - The support for a filesystem (this partition) is tested by calling - partition.format() with a path set to '/dev/null' which returns two exceptions: - 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported - 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type - """ - try: - self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) - except (SysCallError, DiskError): - pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code - except UnknownFilesystemFormat as err: - raise err - return True - - -def get_mount_fs_type(fs :str) -> str: - if fs == 'ntfs': - return 'ntfs3' # Needed to use the Paragon R/W NTFS driver - elif fs == 'fat32': - return 'vfat' # This is the actual type used for fat32 mounting - return fs |