from __future__ import annotations import json import logging import os # type: ignore import pathlib import re import time import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .partition import Partition from .blockdevice import BlockDevice from .dmcryptdev import DMCryptDev from .mapperdev import MapperDev from ..exceptions import SysCallError, DiskError from ..general import SysCommand from ..output import log from ..storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GIGA = 2 ** 30 def convert_size_to_gb(size :Union[int, float]) -> float: return round(size / GIGA,1) def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: result = {device: 0 for device in block_devices} for device, weight in result.items(): if device.spinning: weight -= 10 else: weight += 5 if device.bus_type == 'nvme': weight += 20 elif device.bus_type == 'sata': weight += 10 result[device] = weight return result def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: for disk in devices: if disk.size >= gigabytes: yield disk def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] copy_devices = [*devices] for filter_device in filter_out: if filter_device in copy_devices: copy_devices.pop(copy_devices.index(filter_device)) copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes)) if not len(copy_devices): return None return max(copy_devices, key=(lambda device : device.size)) def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] copy_devices = [*devices] for filter_device in filter_out: if filter_device in copy_devices: copy_devices.pop(copy_devices.index(filter_device)) if not len(copy_devices): return None return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) def convert_to_gigabytes(string :str) -> float: unit = string.strip()[-1] size = float(string.strip()[:-1]) if unit == 'M': size = size / 1024 elif unit == 'T': size = size * 1024 return size def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: if f.read(1) == '1': return path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name))) hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") for bus in hotplug_buses: if os.path.exists('/sys/bus/{}'.format(bus)): for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)): device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus))) if re.search(device_link, path): return return True def cleanup_bash_escapes(data :str) -> str: return data.replace(r'\ ', ' ') def blkid(cmd :str) -> Dict[str, Any]: if '-o' in cmd and '-o export' not in cmd: raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") elif '-o' not in cmd: cmd += ' -o export' try: raw_data = SysCommand(cmd).decode() except SysCallError as error: log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) raise error result = {} # Process the raw result devname = None for line in raw_data.split('\r\n'): if not len(line): devname = None continue key, val = line.split('=', 1) if key.lower() == 'devname': devname = val # Lowercase for backwards compatability with all_disks() previous use cases result[devname] = { "path": devname, "PATH": devname } continue result[devname][key] = cleanup_bash_escapes(val) return result def get_loop_info(path :str) -> Dict[str, Any]: for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: if not drive['name'] == path: continue return { path: { **drive, 'type' : 'loop', 'TYPE' : 'loop', 'DEVTYPE' : 'loop', 'PATH' : drive['name'], 'path' : drive['name'] } } return {} def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: result = {} for device_path, device_information in information.items(): dev_name = pathlib.Path(device_information['PATH']).name if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): with open(f"/sys/class/block/{dev_name}/uevent") as fh: device_information.update(uevent(fh.read())) if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): with dmcrypt_name.open('r') as fh: device_information['DMCRYPT_NAME'] = fh.read().strip() result[device_path] = device_information return result def uevent(data :str) -> Dict[str, Any]: information = {} for line in data.replace('\r\n', '\n').split('\n'): if len((line := line.strip())): key, val = line.split('=', 1) information[key] = val return information def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: device_information = {} with open(f"/sys/class/block/{dev_name}/uevent") as fh: device_information.update(uevent(fh.read())) return { f"/dev/{dev_name}" : { **device_information, 'path' : f'/dev/{dev_name}', 'PATH' : f'/dev/{dev_name}', 'PTTYPE' : None } } def all_disks() -> List[BlockDevice]: log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") return all_blockdevices(partitions=False, mappers=False) def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, Any]: """ Returns BlockDevice() and Partition() objects for all available devices. """ from .partition import Partition instances = {} # Due to lsblk being highly unreliable for this use case, # we'll iterate the /sys/class definitions and find the information # from there. for block_device in glob.glob("/sys/class/block/*"): device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" try: information = blkid(f'blkid -p -o export {device_path}') # TODO: No idea why F841 is raised here: except SysCallError as error: # noqa: F841 if error.exit_code in (512, 2): # Assume that it's a loop device, and try to get info on it try: information = get_loop_info(device_path) if not information: raise SysCallError("Could not get loop information", exit_code=1) except SysCallError: information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) else: raise error information = enrich_blockdevice_information(information) for path, path_info in information.items(): if path_info.get('DMCRYPT_NAME'): instances[path] = DMCryptDev(dev_path=path) elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): if partitions: instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path)))) elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': instances[path] = BlockDevice(path, path_info) elif path_info.get('TYPE') == 'squashfs': # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) continue else: log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") if mappers: for block_device in glob.glob("/dev/mapper/*"): if (pathobj := pathlib.Path(block_device)).is_symlink(): instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) return instances def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: partition_name = path.name pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() return f"/dev/{pci_device.parent.name}" def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: collection = all_blockdevices(partitions=False) for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: continue if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()): continue return collection[drive] def split_bind_name(path :Union[pathlib.Path, str]) -> list: # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") # we check for the bind notation. if exist we'll only use the "true" device path if '[' in str(path) : # is a bind path (btrfs subvolume path) device_path, bind_path = str(path).split('[') bind_path = bind_path[:-1].strip() # remove the ] else: device_path = path bind_path = None return device_path,bind_path def find_mountpoint(device_path :str) -> Dict[str, Any]: try: for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: yield filesystem except SysCallError: return {} def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: device_path, bind_path = split_bind_name(path) output = {} for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): break except SysCallError as error: print('ERROR:', error) pass if not traverse: break if not output: raise DiskError(f"Could not get mount information for device path {device_path}") output = json.loads(output) # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter # i.e. the subvolume filesystem we're searching for if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] if 'filesystems' in output: if len(output['filesystems']) > 1: raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") if return_real_path: return output['filesystems'][0], traversal else: return output['filesystems'][0] if return_real_path: return {}, traversal else: return {} def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: for info in data: if info.get('target') not in filters: filters[info.get('target')] = None filters.update(get_all_targets(info.get('children', []))) return filters def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') except SysCallError: return {} if not output: return {} output = json.loads(output) # print(output) mounts = {} block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) block_devices_mountpoints = {} for blockdev in block_devices_available.values(): if not type(blockdev) in (Partition, MapperDev): continue for blockdev_mountpoint in blockdev.mount_information: block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) for mountpoint in list(get_all_targets(output['filesystems']).keys()): if mountpoint in block_devices_mountpoints: if mountpoint not in mounts: mounts[mountpoint] = block_devices_mountpoints[mountpoint] # If the already defined mountpoint is a DMCryptDev, and the newly found # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: mounts[mountpoint] = block_devices_mountpoints[mountpoint] log(f"Available partitions: {mounts}", level=logging.DEBUG) return mounts def get_filesystem_type(path :str) -> Optional[str]: device_name, bind_name = split_bind_name(path) try: return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() except SysCallError: return None def disk_layouts() -> Optional[Dict[str, Any]]: try: if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} else: log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") return None except SysCallError as err: log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") return None except json.decoder.JSONDecodeError as err: log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") return None def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): yield partition def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: return partition def partprobe() -> bool: if SysCommand(f'bash -c "partprobe"').exit_code == 0: time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk return True return False def convert_device_to_uuid(path :str) -> str: device_name, bind_name = split_bind_name(path) for i in range(storage['DISK_RETRY_ATTEMPTS']): partprobe() # TODO: Convert lsblk to blkid # (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID) output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8')) for device in output['blockdevices']: if (dev_uuid := device.get('uuid', None)): return dev_uuid time.sleep(storage['DISK_TIMEOUTS']) raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")