index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Anton Hvornum <anton@hvornum.se> | 2022-02-08 23:21:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-08 23:21:20 +0100 |
commit | d3b6832345c9b5cad54951d4f976ebdbfe59a086 (patch) | |
tree | fd611b06cd1aa7709fb3b5128969a673e0fdca80 /archinstall/lib/disk | |
parent | feffa69042ec537eab6d78597c134693546a2b93 (diff) |
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 32 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs.py | 56 | ||||
-rw-r--r-- | archinstall/lib/disk/dmcryptdev.py | 48 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 241 | ||||
-rw-r--r-- | archinstall/lib/disk/mapperdev.py | 83 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 67 |
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index fac258ef..ff741f18 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -16,10 +16,10 @@ from ..storage import storage class BlockDevice: def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: - from .helpers import all_disks + from .helpers import all_blockdevices # If we don't give any information, we need to auto-fill it. # Otherwise any subsequent usage will break. - info = all_disks()[path].info + info = all_blockdevices(partitions=False)[path].info self.path = path self.info = info @@ -78,16 +78,20 @@ class BlockDevice: If it's a loop-back-device it returns the back-file, For other types it return self.device """ - if self.info['type'] == 'loop': - for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: - if not drive['name'] == self.path: - continue - - return drive['back-file'] + if self.info.get('type') == 'loop': + return self.info['back-file'] else: return self.device @property + def mountpoint(self) -> None: + """ + A dummy function to enable transparent comparisons of mountpoints. + As blockdevices can't be mounted directly, this will always be None + """ + return None + + @property def device(self) -> str: """ Returns the device file of the BlockDevice. @@ -95,20 +99,20 @@ class BlockDevice: If it's a ATA-drive it returns the /dev/X device And if it's a crypto-device it returns the parent device """ - if "type" not in self.info: + if "DEVTYPE" not in self.info: raise DiskError(f'Could not locate backplane info for "{self.path}"') - if self.info['type'] in ['disk','loop']: + if self.info['DEVTYPE'] in ['disk','loop']: return self.path - elif self.info['type'][:4] == 'raid': + elif self.info['DEVTYPE'][:4] == 'raid': # This should catch /dev/md## raid devices return self.path - elif self.info['type'] == 'crypt': + elif self.info['DEVTYPE'] == 'crypt': if 'pkname' not in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" else: - log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG) + log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG) # if not stat.S_ISBLK(os.stat(full_path).st_mode): # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @@ -195,7 +199,7 @@ class BlockDevice: _, start, end, size, *_ = free_space.strip('\r\n;').split(':') yield (start, end, size) except SysCallError as error: - log(f"Could not get free space on {self.path}: {error}", level=logging.INFO) + log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) @property def largest_free_space(self) -> List[str]: diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index ad8d0a52..f2da957f 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -2,7 +2,9 @@ from __future__ import annotations import pathlib import glob import logging -from typing import Union, Dict, TYPE_CHECKING +import re +from typing import Union, Dict, TYPE_CHECKING, Any, Iterator +from dataclasses import dataclass # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -11,7 +13,49 @@ from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand from ..output import log - +from ..exceptions import SysCallError + +@dataclass +class BtrfsSubvolume: + target :str + source :str + fstype :str + name :str + options :str + root :bool = False + +def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]: + if '@' in struct['source']: + subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1] + struct['source'] = struct['source'].replace(f"[{subvolume}]", "") + yield BtrfsSubvolume( + target=struct['target'], + source=struct['source'], + fstype=struct['fstype'], + name=subvolume, + options=struct['options'], + root=index == 0 + ) + index += 1 + + for child in struct.get('children', []): + for item in get_subvolumes_from_findmnt(child, index=index): + yield item + index += 1 + +def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: + try: + output = SysCommand(f"btrfs subvol show {path}").decode() + except SysCallError as error: + print('Error:', error) + + result = {} + for line in output.replace('\r\n', '\n').split('\n'): + if ':' in line: + key, val = line.replace('\t', '').split(':', 1) + result[key.strip().lower().replace(' ', '_')] = val.strip() + + return result def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ @@ -24,7 +68,7 @@ def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.P This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure. Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name] """ - log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING) + log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING) installation_mountpoint = installation.target if type(installation_mountpoint) == str: installation_mountpoint = pathlib.Path(installation_mountpoint) @@ -179,11 +223,7 @@ def manage_btrfs_subvolumes(installation :Installer, # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" - # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'. - # If i don't reset it, process will abort as "already mounted' . - # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice - fake_partition['device_instance'].mountpoint = None - # + # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, # as "normal" ones mountpoints.append(fake_partition) diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py new file mode 100644 index 00000000..63392ffb --- /dev/null +++ b/archinstall/lib/disk/dmcryptdev.py @@ -0,0 +1,48 @@ +import pathlib +import logging +import json +from dataclasses import dataclass +from typing import Optional +from ..exceptions import SysCallError +from ..general import SysCommand +from ..output import log +from .mapperdev import MapperDev + +@dataclass +class DMCryptDev: + dev_path :pathlib.Path + + @property + def name(self): + with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh: + return fh.read().strip() + + @property + def path(self): + return f"/dev/mapper/{self.dev_path}" + + @property + def blockdev(self): + pass + + @property + def MapperDev(self): + return MapperDev(mappername=self.name) + + @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow") + pass + + return None + + @property + def filesystem(self) -> Optional[str]: + return self.MapperDev.filesystem
\ No newline at end of file diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index b04e2740..afaf9e5e 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -5,12 +5,15 @@ import os import pathlib import re import time +import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .partition import Partition from .blockdevice import BlockDevice +from .dmcryptdev import DMCryptDev +from .mapperdev import MapperDev from ..exceptions import SysCallError, DiskError from ..general import SysCommand from ..output import log @@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: return return True -# lsblk --json -l -n -o path -def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: - kwargs.setdefault("partitions", False) - drives = {} - lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8')) - for drive in lsblk['blockdevices']: - if not kwargs['partitions'] and drive['type'] == 'part': +def cleanup_bash_escapes(data :str) -> str: + return data.replace(r'\ ', ' ') + +def blkid(cmd :str) -> Dict[str, Any]: + if '-o' in cmd and '-o export' not in cmd: + raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + elif '-o' not in cmd: + cmd += ' -o export' + + try: + raw_data = SysCommand(cmd).decode() + except SysCallError as error: + log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) + raise error + + result = {} + # Process the raw result + devname = None + for line in raw_data.split('\r\n'): + if not len(line): + devname = None + continue + + key, val = line.split('=', 1) + if key.lower() == 'devname': + devname = val + # Lowercase for backwards compatability with all_disks() previous use cases + result[devname] = { + "path": devname, + "PATH": devname + } + continue + + result[devname][key] = cleanup_bash_escapes(val) + + return result + +def get_loop_info(path :str) -> Dict[str, Any]: + for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: + if not drive['name'] == path: continue - drives[drive['path']] = BlockDevice(drive['path'], drive) + return { + path: { + **drive, + 'type' : 'loop', + 'TYPE' : 'loop', + 'DEVTYPE' : 'loop', + 'PATH' : drive['name'], + 'path' : drive['name'] + } + } + + return {} + +def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: + result = {} + for device_path, device_information in information.items(): + dev_name = pathlib.Path(device_information['PATH']).name + if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): + with dmcrypt_name.open('r') as fh: + device_information['DMCRYPT_NAME'] = fh.read().strip() + + result[device_path] = device_information + + return result + +def uevent(data :str) -> Dict[str, Any]: + information = {} + + for line in data.replace('\r\n', '\n').split('\n'): + if len((line := line.strip())): + key, val = line.split('=', 1) + information[key] = val + + return information + +def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: + device_information = {} + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + return { + f"/dev/{dev_name}" : { + **device_information, + 'path' : f'/dev/{dev_name}', + 'PATH' : f'/dev/{dev_name}', + 'PTTYPE' : None + } + } + +def all_disks() -> List[BlockDevice]: + log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") + return all_blockdevices(partitions=False, mappers=False) + +def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]: + """ + Returns BlockDevice() and Partition() objects for all available devices. + """ + from .partition import Partition - return drives + instances = {} + # Due to lsblk being highly unreliable for this use case, + # we'll iterate the /sys/class definitions and find the information + # from there. + for block_device in glob.glob("/sys/class/block/*"): + device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + try: + information = blkid(f'blkid -p -o export {device_path}') + + # TODO: No idea why F841 is raised here: + except SysCallError as error: # noqa: F841 + if error.exit_code in (512, 2): + # Assume that it's a loop device, and try to get info on it + try: + information = get_loop_info(device_path) + if not information: + raise SysCallError("Could not get loop information", exit_code=1) + + except SysCallError: + information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) + else: + raise error + + information = enrich_blockdevice_information(information) + + for path, path_info in information.items(): + if path_info.get('DMCRYPT_NAME'): + instances[path] = DMCryptDev(dev_path=path) + elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): + if partitions: + instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path)))) + elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': + instances[path] = BlockDevice(path, path_info) + elif path_info.get('TYPE') == 'squashfs': + # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) + continue + else: + log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") + + if mappers: + for block_device in glob.glob("/dev/mapper/*"): + if (pathobj := pathlib.Path(block_device)).is_symlink(): + instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) + + return instances + + +def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: + partition_name = path.name + pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() + return f"/dev/{pci_device.parent.name}" def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: - collection = all_disks() + collection = all_blockdevices(partitions=False) for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: continue @@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy : return collection[drive] def split_bind_name(path :Union[pathlib.Path, str]) -> list: + # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") # we check for the bind notation. if exist we'll only use the "true" device path if '[' in str(path) : # is a bind path (btrfs subvolume path) device_path, bind_path = str(path).split('[') @@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path +def find_mountpoint(device_path :str) -> Dict[str, Any]: + try: + for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: + yield filesystem + except SysCallError: + return {} + def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: - device_path,bind_path = split_bind_name(path) + device_path, bind_path = split_bind_name(path) output = {} for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: - log(f"Getting mount information for device path {traversal}", level=logging.INFO) + log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): break - except SysCallError: + + except SysCallError as error: + print('ERROR:', error) pass if not traverse: break if not output: - raise DiskError(f"Could not get mount information for device path {path}") + raise DiskError(f"Could not get mount information for device path {device_path}") output = json.loads(output) + # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter # i.e. the subvolume filesystem we're searching for if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] + if 'filesystems' in output: if len(output['filesystems']) > 1: - raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}") + raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") if return_real_path: return output['filesystems'][0], traversal @@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur return {} +def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: + for info in data: + if info.get('target') not in filters: + filters[info.get('target')] = None + + filters.update(get_all_targets(info.get('children', []))) + + return filters + def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') except SysCallError: - return [] - - mounts = [] + return {} if not output: - return [] + return {} output = json.loads(output) - for target in output.get('filesystems', []): - # We need to create a BlockDevice() instead of 'None' here when creaiting Partition() - # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None. + # print(output) - # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device - # Note: doing print(partition) here will break because the above mentioned issue. - partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']) - partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + mounts = {} + + block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) + + block_devices_mountpoints = {} + for blockdev in block_devices_available.values(): + if not type(blockdev) in (Partition, MapperDev): + continue - # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using - # (lsblk pkname lists both the partition and blockdevice, BD being the last entry) - result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1] - block_device = BlockDevice(f"/dev/{result}") + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev - # Once we figured the block device out, we can properly create the partition object - partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) - mounts.append(partition) + for mountpoint in list(get_all_targets(output['filesystems']).keys()): + if mountpoint in block_devices_mountpoints: + if mountpoint not in mounts: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] + # If the already defined mountpoint is a DMCryptDev, and the newly found + # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. + elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] - for child in target.get('children', []): - mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target'])) + log(f"Available partitions: {mounts}", level=logging.DEBUG) return mounts diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py new file mode 100644 index 00000000..91ec6d25 --- /dev/null +++ b/archinstall/lib/disk/mapperdev.py @@ -0,0 +1,83 @@ +import glob +import pathlib +import logging +import json +from dataclasses import dataclass +from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING + +from ..exceptions import SysCallError +from ..general import SysCommand +from ..output import log + +if TYPE_CHECKING: + from .btrfs import BtrfsSubvolume + +@dataclass +class MapperDev: + mappername :str + + @property + def name(self): + return self.mappername + + @property + def path(self): + return f"/dev/mapper/{self.mappername}" + + @property + def partition(self): + from .helpers import uevent, get_parent_of_partition + from .partition import Partition + from .blockdevice import BlockDevice + + for mapper in glob.glob('/dev/mapper/*'): + path_obj = pathlib.Path(mapper) + if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink(): + dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve() + + for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): + partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name + + try: + uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() + except SysCallError as error: + log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") + + information = uevent(uevent_data) + block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) + + return Partition(information['DEVNAME'], block_device) + + raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device") + + @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow") + pass + + return None + + @property + def mount_information(self) -> List[Dict[str, Any]]: + from .helpers import find_mountpoint + return list(find_mountpoint(self.path)) + + @property + def filesystem(self) -> Optional[str]: + from .helpers import get_filesystem_type + return get_filesystem_type(self.path) + + @property + def subvolumes(self) -> Iterator['BtrfsSubvolume']: + from .btrfs import get_subvolumes_from_findmnt + + for mountpoint in self.mount_information: + for result in get_subvolumes_from_findmnt(mountpoint): + yield result
\ No newline at end of file diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 7bfde64c..708edd29 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,15 +5,15 @@ import logging import json import os import hashlib -from typing import Optional, Dict, Any, List, Union +from typing import Optional, Dict, Any, List, Union, Iterator from .blockdevice import BlockDevice -from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name +from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand - +from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume class Partition: def __init__(self, @@ -29,9 +29,11 @@ class Partition: part_id = os.path.basename(path) self.block_device = block_device + if type(self.block_device) is str: + raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") + self.path = path self.part_id = part_id - self.mountpoint = mountpoint self.target_mountpoint = mountpoint self.filesystem = filesystem self._encrypted = None @@ -42,20 +44,12 @@ class Partition: self.mount(mountpoint) try: - mount_information = get_mount_info(self.path) + self.mount_information = list(find_mountpoint(self.path)) except DiskError: - mount_information = {} - - if mount_information.get('target', None): - if self.mountpoint != mount_information.get('target', None) and mountpoint: - raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") - - if target := mount_information.get('target', None): - self.mountpoint = target + self.mount_information = [{}] if not self.filesystem and autodetect_filesystem: - if fstype := mount_information.get('fstype', get_filesystem_type(path)): - self.filesystem = fstype + self.filesystem = get_filesystem_type(path) if self.filesystem == 'crypto_LUKS': self.encrypted = True @@ -98,6 +92,20 @@ class Partition: } @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG) + pass + + return None + + @property def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) @@ -135,14 +143,16 @@ class Partition: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() - if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0: - lsblk = json.loads(handle.decode('UTF-8')) + try: + lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode()) for device in lsblk['blockdevices']: return convert_size_to_gb(device['size']) - elif handle.exit_code == 8192: - # Device is not a block device - return None + except SysCallError as error: + if error.exit_code == 8192: + return None + else: + raise error time.sleep(storage['DISK_TIMEOUTS']) @@ -200,7 +210,14 @@ class Partition: For instance when you want to get a __repr__ of the class. """ self.partprobe() - return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() + try: + return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() + except SysCallError as error: + if self.block_device.info.get('TYPE') == 'iso9660': + # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) + return None + + raise DiskError(f"Could not get PARTUUID of partition {self}: {error}") @property def encrypted(self) -> Union[bool, None]: @@ -237,6 +254,12 @@ class Partition: device_path, bind_name = split_bind_name(self.path) return bind_name + @property + def subvolumes(self) -> Iterator[BtrfsSubvolume]: + for mountpoint in self.mount_information: + for result in get_subvolumes_from_findmnt(mountpoint): + yield result + def partprobe(self) -> bool: if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0: time.sleep(1) @@ -411,7 +434,6 @@ class Partition: except SysCallError as err: raise err - self.mountpoint = target return True return False @@ -425,7 +447,6 @@ class Partition: if 0 < worker.exit_code < 8000: raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) - self.mountpoint = None return True def umount(self) -> bool: |