From 89cefb9a1c7d4c4968e7d8645149078e601c9d1c Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Fri, 12 May 2023 02:30:09 +1000 Subject: Cleanup imports and unused code (#1801) * Cleanup imports and unused code * Update build check * Keep deprecation exception * Simplify logging --------- Co-authored-by: Daniel Girtler --- archinstall/lib/disk/device_handler.py | 99 +++++++++++++++---------------- archinstall/lib/disk/device_model.py | 50 ++++++++-------- archinstall/lib/disk/encryption_menu.py | 2 +- archinstall/lib/disk/fido.py | 11 +--- archinstall/lib/disk/filesystem.py | 11 ++-- archinstall/lib/disk/partitioning_menu.py | 9 ++- 6 files changed, 87 insertions(+), 95 deletions(-) (limited to 'archinstall/lib/disk') diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py index 13bde77a..4341c53c 100644 --- a/archinstall/lib/disk/device_handler.py +++ b/archinstall/lib/disk/device_handler.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import logging import os import time from pathlib import Path @@ -24,7 +23,7 @@ from .device_model import ( from ..exceptions import DiskError, UnknownFilesystemFormat from ..general import SysCommand, SysCallError, JSON from ..luks import Luks2 -from ..output import log +from ..output import debug, error, info, warn from ..utils.util import is_subpath if TYPE_CHECKING: @@ -48,11 +47,11 @@ class DeviceHandler(object): for device in getAllDevices(): try: disk = Disk(device) - except DiskLabelException as error: - if 'unrecognised disk label' in getattr(error, 'message', str(error)): + except DiskLabelException as err: + if 'unrecognised disk label' in getattr(error, 'message', str(err)): disk = freshDisk(device, PartitionTable.GPT.value) else: - log(f'Unable to get disk from device: {device}', level=logging.DEBUG) + debug(f'Unable to get disk from device: {device}') continue device_info = _DeviceInfo.from_disk(disk) @@ -93,7 +92,7 @@ class DeviceHandler(object): return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None return None except ValueError: - log(f'Could not determine the filesystem: {partition.fileSystem}', level=logging.DEBUG) + debug(f'Could not determine the filesystem: {partition.fileSystem}') return None @@ -137,7 +136,7 @@ class DeviceHandler(object): try: result = SysCommand(f'btrfs subvolume list {mountpoint}') except SysCallError as err: - log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG) + debug(f'Failed to read btrfs subvolume information: {err}') return subvol_infos try: @@ -150,7 +149,7 @@ class DeviceHandler(object): sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None) subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint)) except json.decoder.JSONDecodeError as err: - log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + error(f"Could not decode lsblk JSON: {result}") raise err if not lsblk_info.mountpoint: @@ -203,14 +202,14 @@ class DeviceHandler(object): options += additional_parted_options options_str = ' '.join(options) - log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') + info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') try: SysCommand(f"/usr/bin/{command} {options_str} {path}") - except SysCallError as error: - msg = f'Could not format {path} with {fs_type.value}: {error.message}' - log(msg, fg='red') - raise DiskError(msg) from error + except SysCallError as err: + msg = f'Could not format {path} with {fs_type.value}: {err.message}' + error(msg) + raise DiskError(msg) from err def _perform_enc_formatting( self, @@ -227,16 +226,16 @@ class DeviceHandler(object): key_file = luks_handler.encrypt() - log(f'Unlocking luks2 device: {dev_path}', level=logging.DEBUG) + debug(f'Unlocking luks2 device: {dev_path}') luks_handler.unlock(key_file=key_file) if not luks_handler.mapper_dev: raise DiskError('Failed to unlock luks device') - log(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}', level=logging.INFO) + info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}') self._perform_formatting(fs_type, luks_handler.mapper_dev) - log(f'luks2 locking device: {dev_path}', level=logging.INFO) + info(f'luks2 locking device: {dev_path}') luks_handler.lock() def format( @@ -285,7 +284,7 @@ class DeviceHandler(object): # when we require a delete and the partition to be (re)created # already exists then we have to delete it first if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: - log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO) + info(f'Delete existing partition: {part_mod.safe_dev_path}') part_info = self.find_partition(part_mod.safe_dev_path) if not part_info: @@ -325,9 +324,9 @@ class DeviceHandler(object): for flag in part_mod.flags: partition.setFlag(flag.value) - log(f'\tType: {part_mod.type.value}', level=logging.DEBUG) - log(f'\tFilesystem: {part_mod.fs_type.value}', level=logging.DEBUG) - log(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length', level=logging.DEBUG) + debug(f'\tType: {part_mod.type.value}') + debug(f'\tFilesystem: {part_mod.fs_type.value}') + debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length') try: disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint) @@ -339,41 +338,41 @@ class DeviceHandler(object): # the partition has a real path now as it was created part_mod.dev_path = Path(partition.path) - info = self._fetch_partuuid(part_mod.dev_path) + lsblk_info = self._fetch_partuuid(part_mod.dev_path) - part_mod.partuuid = info.partuuid - part_mod.uuid = info.uuid + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid except PartitionException as ex: raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex def _fetch_partuuid(self, path: Path) -> LsblkInfo: attempts = 3 - info: Optional[LsblkInfo] = None + lsblk_info: Optional[LsblkInfo] = None self.partprobe(path) for attempt_nr in range(attempts): time.sleep(attempt_nr + 1) - info = get_lsblk_info(path) + lsblk_info = get_lsblk_info(path) - if info.partuuid: + if lsblk_info.partuuid: break self.partprobe(path) - if not info or not info.partuuid: - log(f'Unable to determine new partition uuid: {path}\n{info}', level=logging.DEBUG) + if not lsblk_info or not lsblk_info.partuuid: + debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}') raise DiskError(f'Unable to determine new partition uuid: {path}') - log(f'partuuid found: {info.json()}', level=logging.DEBUG) + debug(f'partuuid found: {lsblk_info.json()}') - return info + return lsblk_info def create_btrfs_volumes( self, part_mod: PartitionModification, enc_conf: Optional['DiskEncryption'] = None ): - log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO) + info(f'Creating subvolumes: {part_mod.safe_dev_path}') luks_handler = None @@ -396,7 +395,7 @@ class DeviceHandler(object): self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) for sub_vol in part_mod.btrfs_subvols: - log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG) + debug(f'Creating subvolume: {sub_vol.name}') if luks_handler is not None: subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name @@ -408,14 +407,14 @@ class DeviceHandler(object): if sub_vol.nodatacow: try: SysCommand(f'chattr +C {subvol_path}') - except SysCallError as error: - raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {error}') + except SysCallError as err: + raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}') if sub_vol.compress: try: SysCommand(f'chattr +c {subvol_path}') - except SysCallError as error: - raise DiskError(f'Could not set compress attribute at {subvol_path}: {error}') + except SysCallError as err: + raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}') if luks_handler is not None and luks_handler.mapper_dev is not None: self.umount(luks_handler.mapper_dev) @@ -435,12 +434,12 @@ class DeviceHandler(object): return luks_handler def _umount_all_existing(self, modification: DeviceModification): - log(f'Unmounting all partitions: {modification.device_path}', level=logging.INFO) + info(f'Unmounting all partitions: {modification.device_path}') existing_partitions = self._devices[modification.device_path].partition_infos for partition in existing_partitions: - log(f'Unmounting: {partition.path}', level=logging.DEBUG) + debug(f'Unmounting: {partition.path}') # un-mount for existing encrypted partitions if partition.fs_type == FilesystemType.Crypto_luks: @@ -472,10 +471,10 @@ class DeviceHandler(object): part_table = partition_table.value if partition_table else None disk = freshDisk(modification.device.disk.device, part_table) else: - log(f'Use existing device: {modification.device_path}') + info(f'Use existing device: {modification.device_path}') disk = modification.device.disk - log(f'Creating partitions: {modification.device_path}') + info(f'Creating partitions: {modification.device_path}') # TODO sort by delete first @@ -507,7 +506,7 @@ class DeviceHandler(object): lsblk_info = get_lsblk_info(dev_path) if target_mountpoint in lsblk_info.mountpoints: - log(f'Device already mounted at {target_mountpoint}') + info(f'Device already mounted at {target_mountpoint}') return str_options = ','.join(options) @@ -517,7 +516,7 @@ class DeviceHandler(object): command = f'mount {mount_fs} {str_options} {dev_path} {target_mountpoint}' - log(f'Mounting {dev_path}: command', level=logging.DEBUG) + debug(f'Mounting {dev_path}: command') try: SysCommand(command) @@ -536,10 +535,10 @@ class DeviceHandler(object): raise ex if len(lsblk_info.mountpoints) > 0: - log(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}', level=logging.DEBUG) + debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}') for mountpoint in lsblk_info.mountpoints: - log(f'Unmounting mountpoint: {mountpoint}', level=logging.DEBUG) + debug(f'Unmounting mountpoint: {mountpoint}') command = 'umount' @@ -574,10 +573,10 @@ class DeviceHandler(object): command = 'partprobe' try: - log(f'Calling partprobe: {command}', level=logging.DEBUG) + debug(f'Calling partprobe: {command}') SysCommand(command) - except SysCallError as error: - log(f'"{command}" failed to run: {error}', level=logging.DEBUG) + except SysCallError as err: + error(f'"{command}" failed to run: {err}') def _wipe(self, dev_path: Path): """ @@ -594,7 +593,7 @@ class DeviceHandler(object): This is not intended to be secure, but rather to ensure that auto-discovery tools don't recognize anything here. """ - log(f'Wiping partitions and metadata: {block_device.device_info.path}') + info(f'Wiping partitions and metadata: {block_device.device_info.path}') for partition in block_device.partition_infos: self._wipe(partition.path) @@ -609,8 +608,8 @@ def disk_layouts() -> str: lsblk_info = get_all_lsblk_info() return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON) except SysCallError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + warn(f"Could not return disk layouts: {err}") return '' except json.decoder.JSONDecodeError as err: - log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow") + warn(f"Could not return disk layouts: {err}") return '' diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py index d57347b7..36dd0c4f 100644 --- a/archinstall/lib/disk/device_model.py +++ b/archinstall/lib/disk/device_model.py @@ -2,7 +2,6 @@ from __future__ import annotations import dataclasses import json -import logging import math import time import uuid @@ -18,7 +17,7 @@ from parted import Disk, Geometry, Partition from ..exceptions import DiskError, SysCallError from ..general import SysCommand -from ..output import log +from ..output import debug, error from ..storage import storage if TYPE_CHECKING: @@ -282,7 +281,7 @@ class _PartitionInfo: btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list) def as_json(self) -> Dict[str, Any]: - info = { + part_info = { 'Name': self.name, 'Type': self.type.value, 'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')), @@ -293,9 +292,9 @@ class _PartitionInfo: } if self.btrfs_subvol_infos: - info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' + part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes' - return info + return part_info @classmethod def from_partition( @@ -392,7 +391,7 @@ class SubvolumeModification: mods = [] for entry in subvol_args: if not entry.get('name', None) or not entry.get('mountpoint', None): - log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG) + debug(f'Subvolume arg is missing name: {entry}') continue mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None @@ -705,7 +704,7 @@ class PartitionModification: """ Called for displaying data in table format """ - info = { + part_mod = { 'Status': self.status.value, 'Device': str(self.dev_path) if self.dev_path else '', 'Type': self.type.value, @@ -718,9 +717,9 @@ class PartitionModification: } if self.btrfs_subvols: - info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' + part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes' - return info + return part_mod @dataclass @@ -916,36 +915,36 @@ class LsblkInfo: @classmethod def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo: - info = cls() + lsblk_info = cls() for f in cls.fields(): lsblk_field = _clean_field(f, CleanType.Blockdevice) data_field = _clean_field(f, CleanType.Dataclass) val: Any = None - if isinstance(getattr(info, data_field), Path): + if isinstance(getattr(lsblk_info, data_field), Path): val = Path(blockdevice[lsblk_field]) - elif isinstance(getattr(info, data_field), Size): + elif isinstance(getattr(lsblk_info, data_field), Size): val = Size(blockdevice[lsblk_field], Unit.B) else: val = blockdevice[lsblk_field] - setattr(info, data_field, val) + setattr(lsblk_info, data_field, val) - info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] + lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])] # sometimes lsblk returns 'mountpoints': [null] - info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt] + lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt] fs_roots = [] - for r in info.fsroots: + for r in lsblk_info.fsroots: if r: path = Path(r) # store the fsroot entries without the leading / fs_roots.append(path.relative_to(path.anchor)) - info.fsroots = fs_roots + lsblk_info.fsroots = fs_roots - return info + return lsblk_info class CleanType(Enum): @@ -978,16 +977,16 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int = try: result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}') break - except SysCallError as error: + except SysCallError as err: # Get the output minus the message/info from lsblk if it returns a non-zero exit code. - if error.worker: - err = error.worker.decode('UTF-8') - log(f'Error calling lsblk: {err}', level=logging.DEBUG) + if err.worker: + err_str = err.worker.decode('UTF-8') + debug(f'Error calling lsblk: {err_str}') else: - raise error + raise err if retry_attempt == retry - 1: - raise error + raise err time.sleep(1) @@ -997,11 +996,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int = blockdevices = block_devices['blockdevices'] return [LsblkInfo.from_json(device) for device in blockdevices] except json.decoder.JSONDecodeError as err: - log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR) + error(f"Could not decode lsblk JSON: {result}") raise err raise DiskError(f'Failed to read disk "{dev_path}" with lsblk') + def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo: if infos := _fetch_lsblk_info(dev_path): return infos[0] diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py index 285270fb..8c64e65e 100644 --- a/archinstall/lib/disk/encryption_menu.py +++ b/archinstall/lib/disk/encryption_menu.py @@ -13,7 +13,7 @@ from ..menu import ( MenuSelectionType, TableMenu ) -from ..user_interaction.utils import get_password +from ..interactions.utils import get_password from ..menu import Menu from ..general import secret from .fido import Fido2Device, Fido2 diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py index 2a53b551..97c38d84 100644 --- a/archinstall/lib/disk/fido.py +++ b/archinstall/lib/disk/fido.py @@ -1,13 +1,12 @@ from __future__ import annotations import getpass -import logging from pathlib import Path from typing import List, Optional from .device_model import PartitionModification, Fido2Device from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes -from ..output import log +from ..output import error, info class Fido2: @@ -39,7 +38,7 @@ class Fido2: if not cls._loaded or reload: ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') if not ret: - log('Unable to retrieve fido2 devices', level=logging.ERROR) + error('Unable to retrieve fido2 devices') return [] fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore @@ -88,8 +87,4 @@ class Fido2: worker.write(bytes(getpass.getpass(" "), 'UTF-8')) pin_inputted = True - log( - f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.", - level=logging.INFO, - fg="yellow" - ) + info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds') diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 6ea99340..dc99afce 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import signal import sys import time @@ -8,8 +7,8 @@ from typing import Any, Optional, TYPE_CHECKING from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption from .device_handler import device_handler -from ..hardware import has_uefi -from ..output import log +from ..hardware import SysInfo +from ..output import debug from ..menu import Menu if TYPE_CHECKING: @@ -27,13 +26,13 @@ class FilesystemHandler: def perform_filesystem_operations(self, show_countdown: bool = True): if self._disk_config.config_type == DiskLayoutType.Pre_mount: - log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG) + debug('Disk layout configuration is set to pre-mount, not performing any operations') return device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications)) if not device_mods: - log('No modifications required', level=logging.DEBUG) + debug('No modifications required') return device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods]) @@ -48,7 +47,7 @@ class FilesystemHandler: # Setup the blockdevice, filesystem (and optionally encryption). # Once that's done, we'll hand over to perform_installation() partition_table = PartitionTable.GPT - if has_uefi() is False: + if SysInfo.has_uefi() is False: partition_table = PartitionTable.MBR for mod in device_mods: diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py index 686e8c29..89cf6293 100644 --- a/archinstall/lib/disk/partitioning_menu.py +++ b/archinstall/lib/disk/partitioning_menu.py @@ -1,13 +1,12 @@ from __future__ import annotations -import logging from pathlib import Path from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \ ModificationStatus from ..menu import Menu, ListManager, MenuSelection, TextInput -from ..output import FormattedOutput, log +from ..output import FormattedOutput, warn from .subvolume_menu import SubvolumeMenu if TYPE_CHECKING: @@ -229,7 +228,7 @@ class PartitioningList(ListManager): if not start_sector or self._validate_sector(start_sector): break - log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO) + warn(f'Invalid start sector entered: {start_sector}') if not start_sector: start_sector = str(largest_free_area.start) @@ -245,7 +244,7 @@ class PartitioningList(ListManager): if not end_value or self._validate_sector(start_sector, end_value): break - log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO) + warn(f'Invalid end sector entered: {start_sector}') # override the default value with the user value if end_value: @@ -300,7 +299,7 @@ class PartitioningList(ListManager): if choice.value == Menu.no(): return [] - from ..user_interaction.disk_conf import suggest_single_disk_layout + from ..interactions.disk_conf import suggest_single_disk_layout device_modification = suggest_single_disk_layout(self._device) return device_modification.partitions -- cgit v1.2.3-70-g09d2