Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/helpers.py
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-02-08 23:21:20 +0100
committerGitHub <noreply@github.com>2022-02-08 23:21:20 +0100
commitd3b6832345c9b5cad54951d4f976ebdbfe59a086 (patch)
treefd611b06cd1aa7709fb3b5128969a673e0fdca80 /archinstall/lib/disk/helpers.py
parentfeffa69042ec537eab6d78597c134693546a2b93 (diff)
Change usage `lsblk` to `blkid` where possible. And general improvements to disk handling. (#949)
* Renamed all_disks() to all_blockdevices() as it's technically a bit more correct. As it would return partitions as well. And it now distinguishes between BlockDevice() and Partition() when returning values. Also lsblk has been replaced with blkid and glob.glob() on /sys/class/block/ * Added handling of loop devices * Added device information enrichment * Removed old code * Updated the usage of blockdevice.info['type'] to 'DEVTYPE' as per returned by blkid instead of lsblk * Created a MapperDev() and DMCryptDev() 'disk' classes. This should help differentiate between BlockDevice(), Partition() and crypt-specific devices. Due to some new helper functions (mentioned later) BlockDevice().device_or_backfile has been simplified thanks to the information being pre-existing instead of having to call losetup. BlockDevice().mountpoint has been added as a dummy function. archinstall.enrich_blockdevice_information() has been added to enrich information extracted from blkid. archinstall.uevent() has been created to parse blkdid data when in -o export format, also eats up /sys/class/x/y/uevent data which is of the same format. all_blockdevices() now returns mapper devices found under /dev/mapper/ and properly returns the different types, not just BlockDevice() for everything. archinstall.get_parent_of_partition() has been added, which can translate /dev/sda1 to /dev/sda via strings by using /sys/class/block/ - note here tho that it has to be a blockdevice. Other parents won't return properly. archinstall.find_mountpoint() was created to be able to find mountpoints for a device, rather than the other way around which get_mount_info() handles. find_mountpoint() will convert /dev/sda1 -> /boot for instance, whereas get_mount_info('/boot') will convert /boot -> /dev/sda1. archinstall.get_partitions_in_use() will now properly traverse the entire structure, not just one level deep. This called for the creation of get_all_targets() which will take a dictionary structure and look for 'target': '/path' and return all '/path' values, recursively without being trapped in get_partitions_in_use() which has recursive targets. get_partitions_in_use() now returns a dictionary instead of a list, as it also returns the mountpoint and associated device. It will also prioritize MapperDev over DMCryptDev (as MapperDev is a child of DMCryptDev and can be resolved, other way around is 'trickier'). * Reverted Installer().partitions to return only the partitions. It's a slight speed defecit by not returning the mountpoints directly as {mntpoint: partition}, because the partition.mountpoint is a bit slow. But it avoids breaking legacy code :) * Fixed a few imports, and added MapperDev.filesystem * Fixed so that Installer().detect_encryption() uses the new separated logic of MapperDev/DMCryptDev/Partition * Fixing a translation issue on hostname * Added DMCryptDev.filesystem * Added back archinstall.all_disks() for legacy reasons. * Added a deprecation warning to archinstall.all_disks() * Added a enrichment check for dm-crypt specific devices, which in turn sets DMCRYPT_NAME on the device information, making it easier to detect and target DMCryptDev when needed. This should avoid issues with loopdevices being detected as DMCryptDev. Also some minor tweaks to variable names on unencrypted setups * Made debug output hidden without --verbose. Also added get_subvolume_info() which takes a path and returns any 'btrfs subvol show' information on the path * Partition().subvolumes has been added. Which generates an iterator over potential subvolumes based on mountpoint. Partition().mount_information is now a list of mount-destinations, as one partition should be allowed to be mounted to several places (more commonly used in btrfs subvolumes). BtrfsSubvolume() has been added as a 'device type', which should make it more easy to recognize a BtrfsSubvolume object right out of the gate. Only found from Partition().subvolumes currently. * Parameter fix for --verbose in log() * Made sure loopdev devices have a PATH key as well, to make dealing with them as blockdevice as seamless as possible. * Added backup information grabbing on uninitated devices * Tweaked backup option if losetup cannot find the blockdev * looks like losetup doesn't exit with a bad code even when it should. Raising ourselves when information is empty. * Fixed structure returned from get_blockdevice_uevent() * Made sure that fallback to blkid and loopdev info returns a PTTYPE (Partition Table Type) that is None, since we can't locate it yet (empty drive). But it's not False either, which is what logic after is looking for * Deprecated split_bind_name() and added more debugging * get_partitions_in_use() now only iterates over Partition() objects. And properly checks all mountpoints, not only the first. * Flipped SysCallError to show the last 500 bytes if data rather than the first, to catch the actual errors. * Removed anonymous parameters to all_blockdevices() * Added .mount_information to MapperDev * Added typing annotations * Partition().mountpoint is now a @property that will live-report the current mountpoint. * Removed the need for setting Partition().mountpoint = X when calling .mount() and .unmount(), as it's live-reported instead. * Added .subvolumes to MapperDev() * Added debug information * Muting F841 in helpers.py * Tweaked debug information in get_mount_info() * Minimized log output by moving Partition().mountpoint to DEBUG, as partitions are allowed to not be mounted * Simplified Installer().add_bootloader() to act as a router to the new split function, one for each bootloader type. This since flake8 complained about complexity and I agree that it became to wild.
Diffstat (limited to 'archinstall/lib/disk/helpers.py')
-rw-r--r--archinstall/lib/disk/helpers.py241
1 files changed, 206 insertions, 35 deletions
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index b04e2740..afaf9e5e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -5,12 +5,15 @@ import os
import pathlib
import re
import time
+import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
+from .dmcryptdev import DMCryptDev
+from .mapperdev import MapperDev
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
return
return True
-# lsblk --json -l -n -o path
-def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
- kwargs.setdefault("partitions", False)
- drives = {}
- lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
- for drive in lsblk['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part':
+def cleanup_bash_escapes(data :str) -> str:
+ return data.replace(r'\ ', ' ')
+
+def blkid(cmd :str) -> Dict[str, Any]:
+ if '-o' in cmd and '-o export' not in cmd:
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ elif '-o' not in cmd:
+ cmd += ' -o export'
+
+ try:
+ raw_data = SysCommand(cmd).decode()
+ except SysCallError as error:
+ log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
+ raise error
+
+ result = {}
+ # Process the raw result
+ devname = None
+ for line in raw_data.split('\r\n'):
+ if not len(line):
+ devname = None
+ continue
+
+ key, val = line.split('=', 1)
+ if key.lower() == 'devname':
+ devname = val
+ # Lowercase for backwards compatability with all_disks() previous use cases
+ result[devname] = {
+ "path": devname,
+ "PATH": devname
+ }
+ continue
+
+ result[devname][key] = cleanup_bash_escapes(val)
+
+ return result
+
+def get_loop_info(path :str) -> Dict[str, Any]:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == path:
continue
- drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return {
+ path: {
+ **drive,
+ 'type' : 'loop',
+ 'TYPE' : 'loop',
+ 'DEVTYPE' : 'loop',
+ 'PATH' : drive['name'],
+ 'path' : drive['name']
+ }
+ }
+
+ return {}
+
+def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
+ result = {}
+ for device_path, device_information in information.items():
+ dev_name = pathlib.Path(device_information['PATH']).name
+ if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
+ with dmcrypt_name.open('r') as fh:
+ device_information['DMCRYPT_NAME'] = fh.read().strip()
+
+ result[device_path] = device_information
+
+ return result
+
+def uevent(data :str) -> Dict[str, Any]:
+ information = {}
+
+ for line in data.replace('\r\n', '\n').split('\n'):
+ if len((line := line.strip())):
+ key, val = line.split('=', 1)
+ information[key] = val
+
+ return information
+
+def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
+ device_information = {}
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ return {
+ f"/dev/{dev_name}" : {
+ **device_information,
+ 'path' : f'/dev/{dev_name}',
+ 'PATH' : f'/dev/{dev_name}',
+ 'PTTYPE' : None
+ }
+ }
+
+def all_disks() -> List[BlockDevice]:
+ log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
+ return all_blockdevices(partitions=False, mappers=False)
+
+def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]:
+ """
+ Returns BlockDevice() and Partition() objects for all available devices.
+ """
+ from .partition import Partition
- return drives
+ instances = {}
+ # Due to lsblk being highly unreliable for this use case,
+ # we'll iterate the /sys/class definitions and find the information
+ # from there.
+ for block_device in glob.glob("/sys/class/block/*"):
+ device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ try:
+ information = blkid(f'blkid -p -o export {device_path}')
+
+ # TODO: No idea why F841 is raised here:
+ except SysCallError as error: # noqa: F841
+ if error.exit_code in (512, 2):
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError("Could not get loop information", exit_code=1)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
+ else:
+ raise error
+
+ information = enrich_blockdevice_information(information)
+
+ for path, path_info in information.items():
+ if path_info.get('DMCRYPT_NAME'):
+ instances[path] = DMCryptDev(dev_path=path)
+ elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
+ if partitions:
+ instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path))))
+ elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
+ instances[path] = BlockDevice(path, path_info)
+ elif path_info.get('TYPE') == 'squashfs':
+ # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
+ continue
+ else:
+ log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
+
+ if mappers:
+ for block_device in glob.glob("/dev/mapper/*"):
+ if (pathobj := pathlib.Path(block_device)).is_symlink():
+ instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
+
+ return instances
+
+
+def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
+ partition_name = path.name
+ pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
+ return f"/dev/{pci_device.parent.name}"
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_disks()
+ collection = all_blockdevices(partitions=False)
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
continue
@@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
+ # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
@@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
+def find_mountpoint(device_path :str) -> Dict[str, Any]:
+ try:
+ for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
+ yield filesystem
+ except SysCallError:
+ return {}
+
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
- device_path,bind_path = split_bind_name(path)
+ device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
- log(f"Getting mount information for device path {traversal}", level=logging.INFO)
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
break
- except SysCallError:
+
+ except SysCallError as error:
+ print('ERROR:', error)
pass
if not traverse:
break
if not output:
- raise DiskError(f"Could not get mount information for device path {path}")
+ raise DiskError(f"Could not get mount information for device path {device_path}")
output = json.loads(output)
+
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
+
if 'filesystems' in output:
if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+ raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
if return_real_path:
return output['filesystems'][0], traversal
@@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur
return {}
+def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
+ for info in data:
+ if info.get('target') not in filters:
+ filters[info.get('target')] = None
+
+ filters.update(get_all_targets(info.get('children', [])))
+
+ return filters
+
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
- return []
-
- mounts = []
+ return {}
if not output:
- return []
+ return {}
output = json.loads(output)
- for target in output.get('filesystems', []):
- # We need to create a BlockDevice() instead of 'None' here when creaiting Partition()
- # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None.
+ # print(output)
- # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device
- # Note: doing print(partition) here will break because the above mentioned issue.
- partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])
- partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ mounts = {}
+
+ block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
+
+ block_devices_mountpoints = {}
+ for blockdev in block_devices_available.values():
+ if not type(blockdev) in (Partition, MapperDev):
+ continue
- # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using
- # (lsblk pkname lists both the partition and blockdevice, BD being the last entry)
- result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1]
- block_device = BlockDevice(f"/dev/{result}")
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
- # Once we figured the block device out, we can properly create the partition object
- partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
- mounts.append(partition)
+ for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ if mountpoint in block_devices_mountpoints:
+ if mountpoint not in mounts:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
+ # If the already defined mountpoint is a DMCryptDev, and the newly found
+ # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
+ elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- for child in target.get('children', []):
- mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target']))
+ log(f"Available partitions: {mounts}", level=logging.DEBUG)
return mounts