From fd131c8fe956858fef802f8e53a530daa0b5df47 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Tue, 7 Jun 2022 19:00:48 +1000 Subject: Update blockdevice (#1289) * Update blockdevice class Co-authored-by: Daniel Girtler --- archinstall/lib/disk/blockdevice.py | 344 +++++++++++++++++------------------- 1 file changed, 164 insertions(+), 180 deletions(-) (limited to 'archinstall/lib/disk/blockdevice.py') diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index c7b69205..4e207bf4 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,13 +1,11 @@ from __future__ import annotations -import os import json import logging import time -from functools import cached_property -from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from .partition import Partition + +from collections import OrderedDict +from dataclasses import dataclass +from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING from ..exceptions import DiskError, SysCallError from ..output import log @@ -15,18 +13,44 @@ from ..general import SysCommand from ..storage import storage +if TYPE_CHECKING: + from .partition import Partition + _: Any + + +@dataclass +class BlockSizeInfo: + start: str + end: str + size: str + + +@dataclass +class BlockInfo: + pttype: str + ptuuid: str + size: int + tran: Optional[str] + rota: bool + free_space: Optional[List[BlockSizeInfo]] + + class BlockDevice: def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_blockdevices # If we don't give any information, we need to auto-fill it. # Otherwise any subsequent usage will break. - info = all_blockdevices(partitions=False)[path].info + self.info = all_blockdevices(partitions=False)[path].info + else: + self.info = info - self.path = path - self.info = info + self._path = path self.keep_partitions = True - self.part_cache = {} + self._block_info = self._fetch_information() + self._partitions: Dict[str, 'Partition'] = {} + + self._load_partitions() # TODO: Currently disk encryption is a BIT misleading. # It's actually partition-encryption, but for future-proofing this @@ -35,70 +59,114 @@ class BlockDevice: def __repr__(self, *args :str, **kwargs :str) -> str: return self._str_repr - @cached_property + @property + def path(self) -> str: + return self._path + + @property def _str_repr(self) -> str: - return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})" - - @cached_property - def display_info(self) -> str: - columns = { - str(_('Device')): self.device_or_backfile, - str(_('Size')): f'{self._safe_size}GB', - str(_('Free space')): f'{self._safe_free_space}', + return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})" + + def as_json(self) -> Dict[str, Any]: + return { + str(_('Device')): self._device_or_backfile, + str(_('Size')): f'{self.size}GB', + str(_('Free space')): f'{self._safe_free_space()}', str(_('Bus-type')): f'{self.bus_type}' } - padding = max([len(k) for k in columns.keys()]) - - pretty = '' - for k, v in columns.items(): - k = k.ljust(padding, ' ') - pretty += f'{k} = {v}\n' - - return pretty.rstrip() - - def __iter__(self) -> Iterator[Partition]: + def __iter__(self) -> Iterator['Partition']: for partition in self.partitions: yield self.partitions[partition] def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if hasattr(self, key): return getattr(self, key) - elif key not in self.info: - raise KeyError(f'{self} does not contain information: "{key}"') - return self.info[key] + + if self.info and key in self.info: + return self.info[key] + + raise KeyError(f'{self.info} does not contain information: "{key}"') def __len__(self) -> int: return len(self.partitions) def __lt__(self, left_comparitor :'BlockDevice') -> bool: - return self.path < left_comparitor.path + return self._path < left_comparitor.path def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ - return self.path + return self._path def __dump__(self) -> Dict[str, Dict[str, Any]]: return { - self.path : { - 'partuuid' : self.uuid, - 'wipe' : self.info.get('wipe', None), - 'partitions' : [part.__dump__() for part in self.partitions.values()] + self._path: { + 'partuuid': self.uuid, + 'wipe': self.info.get('wipe', None), + 'partitions': [part.__dump__() for part in self.partitions.values()] } } - @property - def partition_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + def _call_lsblk(self, path: str) -> Dict[str, Any]: + output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8') + if output: + lsblk_info = json.loads(output) + return lsblk_info + + raise DiskError(f'Failed to read disk "{self.path}" with lsblk') + + def _load_partitions(self): + from .partition import Partition - for device in output['blockdevices']: - return device['pttype'] + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + self._partitions.clear() - @cached_property - def device_or_backfile(self) -> str: + if children := device.get('children', None): + root = f'/dev/{device["name"]}' + for child in children: + part_id = child['name'].removeprefix(device['name']) + self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id) + + def _get_free_space(self) -> Optional[List[BlockSizeInfo]]: + # NOTE: parted -s will default to `cancel` on prompt, skipping any partition + # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, + # so the free will ignore the ESP partition and just give the "free" space. + # Doesn't harm us, but worth noting in case something weird happens. + try: + output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8') + if output: + free_lines = [line for line in output.split('\n') if 'free' in line] + sizes = [] + for free_space in free_lines: + _, start, end, size, *_ = free_space.strip('\r\n;').split(':') + sizes.append(BlockSizeInfo(start, end, size)) + + return sizes + except SysCallError as error: + log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG) + + return None + + def _fetch_information(self) -> BlockInfo: + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + free_space = self._get_free_space() + + return BlockInfo( + pttype=device['pttype'], + ptuuid=device['ptuuid'], + size=device['size'], + tran=device['tran'], + rota=device['rota'], + free_space=free_space + ) + + @property + def _device_or_backfile(self) -> Optional[str]: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -118,7 +186,7 @@ class BlockDevice: return None @property - def device(self) -> str: + def device(self) -> Optional[str]: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -126,168 +194,84 @@ class BlockDevice: And if it's a crypto-device it returns the parent device """ if "DEVTYPE" not in self.info: - raise DiskError(f'Could not locate backplane info for "{self.path}"') + raise DiskError(f'Could not locate backplane info for "{self._path}"') if self.info['DEVTYPE'] in ['disk','loop']: - return self.path + return self._path elif self.info['DEVTYPE'][:4] == 'raid': # This should catch /dev/md## raid devices - return self.path + return self._path elif self.info['DEVTYPE'] == 'crypt': if 'pkname' not in self.info: - raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') + raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" else: - log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG) + log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - # if not stat.S_ISBLK(os.stat(full_path).st_mode): - # raise DiskError(f'Selected disk "{full_path}" is not a block device.') - - @property - def partitions(self) -> Dict[str, Partition]: - from .filesystem import Partition - - self.partprobe() - result = SysCommand(['/usr/bin/lsblk', '-J', self.path]) - - if b'not a block device' in result: - raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') - - if not result[:1] == b'{': - raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') - - r = json.loads(result.decode('UTF-8')) - if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: - root_path = f"/dev/{r['blockdevices'][0]['name']}" - for part in r['blockdevices'][0]['children']: - part_id = part['name'][len(os.path.basename(self.path)):] - if part_id not in self.part_cache: - # TODO: Force over-write even if in cache? - if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: - self.part_cache[part_id] = Partition(root_path + part_id, block_device=self, part_id=part_id) - - return {k: self.part_cache[k] for k in sorted(self.part_cache)} + return None @property - def partition(self) -> Partition: - all_partitions = self.partitions - return [all_partitions[k] for k in all_partitions] + def partition_type(self) -> str: + return self._block_info.pttype @property - def partition_table_type(self) -> int: - # TODO: Don't hardcode :) - # Remove if we don't use this function anywhere - from .filesystem import GPT - return GPT - - @cached_property def uuid(self) -> str: - log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') - """ - Returns the disk UUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - """ - return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') - - @cached_property - def _safe_size(self) -> float: - from .helpers import convert_size_to_gb - - try: - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - except SysCallError: - return -1.0 + return self._block_info.ptuuid - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) - - @cached_property + @property def size(self) -> float: from .helpers import convert_size_to_gb + return convert_size_to_gb(self._block_info.size) - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) - - @cached_property - def bus_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return device['tran'] + @property + def bus_type(self) -> Optional[str]: + return self._block_info.tran - @cached_property + @property def spinning(self) -> bool: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) + return self._block_info.rota - for device in output['blockdevices']: - return device['rota'] is True + @property + def partitions(self) -> Dict[str, 'Partition']: + self._partprobe() + self._load_partitions() + return OrderedDict(sorted(self._partitions.items())) - @cached_property - def _safe_free_space(self) -> Tuple[str, ...]: - try: - return '+'.join(part[2] for part in self.free_space) - except SysCallError: - return '?' + @property + def partition(self) -> List['Partition']: + return list(self.partitions.values()) - @cached_property - def free_space(self) -> Tuple[str, ...]: - # NOTE: parted -s will default to `cancel` on prompt, skipping any partition - # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, - # so the free will ignore the ESP partition and just give the "free" space. - # Doesn't harm us, but worth noting in case something weird happens. - try: - for line in SysCommand(f"parted -s --machine {self.path} print free"): - if 'free' in (free_space := line.decode('UTF-8')): - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - yield (start, end, size) - except SysCallError as error: - log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) - - @cached_property - def largest_free_space(self) -> List[str]: - info = [] - for space_info in self.free_space: - if not info: - info = space_info - else: - # [-1] = size - if space_info[-1] > info[-1]: - info = space_info - return info - - @cached_property + @property def first_free_sector(self) -> str: - if info := self.largest_free_space: - start = info[0] + if block_size := self._largest_free_space(): + return block_size.start else: - start = '512MB' - return start + return '512MB' - @cached_property + @property def first_end_sector(self) -> str: - if info := self.largest_free_space: - end = info[1] + if block_size := self._largest_free_space(): + return block_size.end else: - end = f"{self.size}GB" - return end - - def partprobe(self) -> bool: - return SysCommand(['partprobe', self.path]).exit_code == 0 + return f"{self.size}GB" + + def _safe_free_space(self) -> str: + if self._block_info.free_space: + sizes = [free_space.size for free_space in self._block_info.free_space] + return '+'.join(sizes) + return '?' + + def _largest_free_space(self) -> Optional[BlockSizeInfo]: + if self._block_info.free_space: + sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True) + return sorted_sizes[0] + return None - def has_partitions(self) -> int: - return len(self.partitions) - - def has_mount_point(self, mountpoint :str) -> bool: - for partition in self.partitions: - if self.partitions[partition].mountpoint == mountpoint: - return True - return False + def _partprobe(self) -> bool: + return SysCommand(['partprobe', self._path]).exit_code == 0 def flush_cache(self) -> None: - self.part_cache = {} + self._load_partitions() def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: if not uuid and not partuuid: @@ -296,7 +280,7 @@ class BlockDevice: for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): for partition_index, partition in self.partitions.items(): try: - if uuid and partition.uuid.lower() == uuid.lower(): + if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): return partition elif partuuid and partition.part_uuid.lower() == partuuid.lower(): return partition @@ -308,8 +292,8 @@ class BlockDevice: log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) - + log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) - log(f"Cache: {self.part_cache}") + log(f"Cache: {self._partitions}") log(f"Partitions: {self.partitions.items()}") raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") -- cgit v1.2.3-54-g00ecf