index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Andreas Baumann <mail@andreasbaumann.cc> | 2022-09-30 17:25:43 +0200 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2022-09-30 17:25:43 +0200 |
commit | 6408c9dce00aa70ad3c6614d2d793dba9a99aff6 (patch) | |
tree | bff1889dfebde0c74e30e9de427a2c122513684b /archinstall/lib/disk/helpers.py | |
parent | faf925de1882be722d2994d697a802918282e509 (diff) | |
parent | 53a2797af6ac0832bf7dd00dfe96b8ea1867db2e (diff) |
-rw-r--r-- | archinstall/lib/disk/helpers.py | 50 |
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 99856aad..f19125f4 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -8,6 +8,8 @@ import time import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .partition import Partition @@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str: def blkid(cmd :str) -> Dict[str, Any]: if '-o' in cmd and '-o export' not in cmd: - raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.") elif '-o' not in cmd: cmd += ' -o export' @@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]: key, val = line.split('=', 1) if key.lower() == 'devname': devname = val - # Lowercase for backwards compatability with all_disks() previous use cases + # Lowercase for backwards compatibility with all_disks() previous use cases result[devname] = { "path": devname, "PATH": devname @@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, # we'll iterate the /sys/class definitions and find the information # from there. for block_device in glob.glob("/sys/class/block/*"): - device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") + + if device_path.exists() is False: + log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") + continue + try: information = blkid(f'blkid -p -o export {device_path}') except SysCallError as ex: @@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, try: information = get_loop_info(device_path) if not information: + print("Exit code for blkid -p -o export was:", ex.exit_code) raise SysCallError("Could not get loop information", exit_code=1) except SysCallError: + print("Not a loop device, trying uevent rules.") information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) else: + # We could not reliably get any information, perhaps the disk is clean of information? + print("Raising ex because:", ex.exit_code) raise ex + # return instances information = enrich_blockdevice_information(information) @@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path)))) elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': instances[path] = BlockDevice(path, path_info) - elif path_info.get('TYPE') == 'squashfs': + elif path_info.get('TYPE') in ('squashfs', 'erofs'): # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) continue else: @@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict return filters -def get_partitions_in_use(mountpoint :str) -> List[Partition]: +def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: from .partition import Partition try: @@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]: if not type(blockdev) in (Partition, MapperDev): continue - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev + if isinstance(blockdev, Partition): + for blockdev_mountpoint in blockdev.mountpoints: + block_devices_mountpoints[blockdev_mountpoint] = blockdev + else: + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) for mountpoint in list(get_all_targets(output['filesystems']).keys()): + # Since all_blockdevices() returns PosixPath objects, we need to convert + # findmnt paths to pathlib.Path() first: + mountpoint = pathlib.Path(mountpoint) + if mountpoint in block_devices_mountpoints: if mountpoint not in mounts: mounts[mountpoint] = block_devices_mountpoints[mountpoint] @@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]: def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: - for partition in blockdevices.values(): - if partition.get('encrypted', False): - yield partition + for blockdevice in blockdevices.values(): + for partition in blockdevice.get('partitions', []): + if partition.get('encrypted', False): + yield partition def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: @@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str: raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") + def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) Coded for clarity rather than performance Input parms: :parm partition the partition we check - :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema + :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema :parm target (a string representing a mount path we want to check for. :type str @@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri """ # we create the mountpoint list if isinstance(partition,dict): - subvols = partition.get('btrfs',{}).get('subvolumes',{}) - mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols] + subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) + mountpoints = [partition.get('mountpoint')] + mountpoints += [volume.mountpoint for volume in subvolumes] else: mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] + # we check if strict or target == '/': if target in mountpoints: |