Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/device_handler.py99
-rw-r--r--archinstall/lib/disk/device_model.py50
-rw-r--r--archinstall/lib/disk/encryption_menu.py2
-rw-r--r--archinstall/lib/disk/fido.py11
-rw-r--r--archinstall/lib/disk/filesystem.py11
-rw-r--r--archinstall/lib/disk/partitioning_menu.py9
6 files changed, 87 insertions, 95 deletions
diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py
index 13bde77a..4341c53c 100644
--- a/archinstall/lib/disk/device_handler.py
+++ b/archinstall/lib/disk/device_handler.py
@@ -1,7 +1,6 @@
from __future__ import annotations
import json
-import logging
import os
import time
from pathlib import Path
@@ -24,7 +23,7 @@ from .device_model import (
from ..exceptions import DiskError, UnknownFilesystemFormat
from ..general import SysCommand, SysCallError, JSON
from ..luks import Luks2
-from ..output import log
+from ..output import debug, error, info, warn
from ..utils.util import is_subpath
if TYPE_CHECKING:
@@ -48,11 +47,11 @@ class DeviceHandler(object):
for device in getAllDevices():
try:
disk = Disk(device)
- except DiskLabelException as error:
- if 'unrecognised disk label' in getattr(error, 'message', str(error)):
+ except DiskLabelException as err:
+ if 'unrecognised disk label' in getattr(error, 'message', str(err)):
disk = freshDisk(device, PartitionTable.GPT.value)
else:
- log(f'Unable to get disk from device: {device}', level=logging.DEBUG)
+ debug(f'Unable to get disk from device: {device}')
continue
device_info = _DeviceInfo.from_disk(disk)
@@ -93,7 +92,7 @@ class DeviceHandler(object):
return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None
return None
except ValueError:
- log(f'Could not determine the filesystem: {partition.fileSystem}', level=logging.DEBUG)
+ debug(f'Could not determine the filesystem: {partition.fileSystem}')
return None
@@ -137,7 +136,7 @@ class DeviceHandler(object):
try:
result = SysCommand(f'btrfs subvolume list {mountpoint}')
except SysCallError as err:
- log(f'Failed to read btrfs subvolume information: {err}', level=logging.DEBUG)
+ debug(f'Failed to read btrfs subvolume information: {err}')
return subvol_infos
try:
@@ -150,7 +149,7 @@ class DeviceHandler(object):
sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
except json.decoder.JSONDecodeError as err:
- log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
+ error(f"Could not decode lsblk JSON: {result}")
raise err
if not lsblk_info.mountpoint:
@@ -203,14 +202,14 @@ class DeviceHandler(object):
options += additional_parted_options
options_str = ' '.join(options)
- log(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
+ info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
SysCommand(f"/usr/bin/{command} {options_str} {path}")
- except SysCallError as error:
- msg = f'Could not format {path} with {fs_type.value}: {error.message}'
- log(msg, fg='red')
- raise DiskError(msg) from error
+ except SysCallError as err:
+ msg = f'Could not format {path} with {fs_type.value}: {err.message}'
+ error(msg)
+ raise DiskError(msg) from err
def _perform_enc_formatting(
self,
@@ -227,16 +226,16 @@ class DeviceHandler(object):
key_file = luks_handler.encrypt()
- log(f'Unlocking luks2 device: {dev_path}', level=logging.DEBUG)
+ debug(f'Unlocking luks2 device: {dev_path}')
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
- log(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}', level=logging.INFO)
+ info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
self._perform_formatting(fs_type, luks_handler.mapper_dev)
- log(f'luks2 locking device: {dev_path}', level=logging.INFO)
+ info(f'luks2 locking device: {dev_path}')
luks_handler.lock()
def format(
@@ -285,7 +284,7 @@ class DeviceHandler(object):
# when we require a delete and the partition to be (re)created
# already exists then we have to delete it first
if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]:
- log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO)
+ info(f'Delete existing partition: {part_mod.safe_dev_path}')
part_info = self.find_partition(part_mod.safe_dev_path)
if not part_info:
@@ -325,9 +324,9 @@ class DeviceHandler(object):
for flag in part_mod.flags:
partition.setFlag(flag.value)
- log(f'\tType: {part_mod.type.value}', level=logging.DEBUG)
- log(f'\tFilesystem: {part_mod.fs_type.value}', level=logging.DEBUG)
- log(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length', level=logging.DEBUG)
+ debug(f'\tType: {part_mod.type.value}')
+ debug(f'\tFilesystem: {part_mod.fs_type.value}')
+ debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length')
try:
disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint)
@@ -339,41 +338,41 @@ class DeviceHandler(object):
# the partition has a real path now as it was created
part_mod.dev_path = Path(partition.path)
- info = self._fetch_partuuid(part_mod.dev_path)
+ lsblk_info = self._fetch_partuuid(part_mod.dev_path)
- part_mod.partuuid = info.partuuid
- part_mod.uuid = info.uuid
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
except PartitionException as ex:
raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex
def _fetch_partuuid(self, path: Path) -> LsblkInfo:
attempts = 3
- info: Optional[LsblkInfo] = None
+ lsblk_info: Optional[LsblkInfo] = None
self.partprobe(path)
for attempt_nr in range(attempts):
time.sleep(attempt_nr + 1)
- info = get_lsblk_info(path)
+ lsblk_info = get_lsblk_info(path)
- if info.partuuid:
+ if lsblk_info.partuuid:
break
self.partprobe(path)
- if not info or not info.partuuid:
- log(f'Unable to determine new partition uuid: {path}\n{info}', level=logging.DEBUG)
+ if not lsblk_info or not lsblk_info.partuuid:
+ debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}')
raise DiskError(f'Unable to determine new partition uuid: {path}')
- log(f'partuuid found: {info.json()}', level=logging.DEBUG)
+ debug(f'partuuid found: {lsblk_info.json()}')
- return info
+ return lsblk_info
def create_btrfs_volumes(
self,
part_mod: PartitionModification,
enc_conf: Optional['DiskEncryption'] = None
):
- log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO)
+ info(f'Creating subvolumes: {part_mod.safe_dev_path}')
luks_handler = None
@@ -396,7 +395,7 @@ class DeviceHandler(object):
self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
for sub_vol in part_mod.btrfs_subvols:
- log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG)
+ debug(f'Creating subvolume: {sub_vol.name}')
if luks_handler is not None:
subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
@@ -408,14 +407,14 @@ class DeviceHandler(object):
if sub_vol.nodatacow:
try:
SysCommand(f'chattr +C {subvol_path}')
- except SysCallError as error:
- raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {error}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
if sub_vol.compress:
try:
SysCommand(f'chattr +c {subvol_path}')
- except SysCallError as error:
- raise DiskError(f'Could not set compress attribute at {subvol_path}: {error}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
if luks_handler is not None and luks_handler.mapper_dev is not None:
self.umount(luks_handler.mapper_dev)
@@ -435,12 +434,12 @@ class DeviceHandler(object):
return luks_handler
def _umount_all_existing(self, modification: DeviceModification):
- log(f'Unmounting all partitions: {modification.device_path}', level=logging.INFO)
+ info(f'Unmounting all partitions: {modification.device_path}')
existing_partitions = self._devices[modification.device_path].partition_infos
for partition in existing_partitions:
- log(f'Unmounting: {partition.path}', level=logging.DEBUG)
+ debug(f'Unmounting: {partition.path}')
# un-mount for existing encrypted partitions
if partition.fs_type == FilesystemType.Crypto_luks:
@@ -472,10 +471,10 @@ class DeviceHandler(object):
part_table = partition_table.value if partition_table else None
disk = freshDisk(modification.device.disk.device, part_table)
else:
- log(f'Use existing device: {modification.device_path}')
+ info(f'Use existing device: {modification.device_path}')
disk = modification.device.disk
- log(f'Creating partitions: {modification.device_path}')
+ info(f'Creating partitions: {modification.device_path}')
# TODO sort by delete first
@@ -507,7 +506,7 @@ class DeviceHandler(object):
lsblk_info = get_lsblk_info(dev_path)
if target_mountpoint in lsblk_info.mountpoints:
- log(f'Device already mounted at {target_mountpoint}')
+ info(f'Device already mounted at {target_mountpoint}')
return
str_options = ','.join(options)
@@ -517,7 +516,7 @@ class DeviceHandler(object):
command = f'mount {mount_fs} {str_options} {dev_path} {target_mountpoint}'
- log(f'Mounting {dev_path}: command', level=logging.DEBUG)
+ debug(f'Mounting {dev_path}: command')
try:
SysCommand(command)
@@ -536,10 +535,10 @@ class DeviceHandler(object):
raise ex
if len(lsblk_info.mountpoints) > 0:
- log(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}', level=logging.DEBUG)
+ debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}')
for mountpoint in lsblk_info.mountpoints:
- log(f'Unmounting mountpoint: {mountpoint}', level=logging.DEBUG)
+ debug(f'Unmounting mountpoint: {mountpoint}')
command = 'umount'
@@ -574,10 +573,10 @@ class DeviceHandler(object):
command = 'partprobe'
try:
- log(f'Calling partprobe: {command}', level=logging.DEBUG)
+ debug(f'Calling partprobe: {command}')
SysCommand(command)
- except SysCallError as error:
- log(f'"{command}" failed to run: {error}', level=logging.DEBUG)
+ except SysCallError as err:
+ error(f'"{command}" failed to run: {err}')
def _wipe(self, dev_path: Path):
"""
@@ -594,7 +593,7 @@ class DeviceHandler(object):
This is not intended to be secure, but rather to ensure that
auto-discovery tools don't recognize anything here.
"""
- log(f'Wiping partitions and metadata: {block_device.device_info.path}')
+ info(f'Wiping partitions and metadata: {block_device.device_info.path}')
for partition in block_device.partition_infos:
self._wipe(partition.path)
@@ -609,8 +608,8 @@ def disk_layouts() -> str:
lsblk_info = get_all_lsblk_info()
return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON)
except SysCallError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
+ warn(f"Could not return disk layouts: {err}")
return ''
except json.decoder.JSONDecodeError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
+ warn(f"Could not return disk layouts: {err}")
return ''
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index d57347b7..36dd0c4f 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -2,7 +2,6 @@ from __future__ import annotations
import dataclasses
import json
-import logging
import math
import time
import uuid
@@ -18,7 +17,7 @@ from parted import Disk, Geometry, Partition
from ..exceptions import DiskError, SysCallError
from ..general import SysCommand
-from ..output import log
+from ..output import debug, error
from ..storage import storage
if TYPE_CHECKING:
@@ -282,7 +281,7 @@ class _PartitionInfo:
btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list)
def as_json(self) -> Dict[str, Any]:
- info = {
+ part_info = {
'Name': self.name,
'Type': self.type.value,
'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')),
@@ -293,9 +292,9 @@ class _PartitionInfo:
}
if self.btrfs_subvol_infos:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
+ part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
- return info
+ return part_info
@classmethod
def from_partition(
@@ -392,7 +391,7 @@ class SubvolumeModification:
mods = []
for entry in subvol_args:
if not entry.get('name', None) or not entry.get('mountpoint', None):
- log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG)
+ debug(f'Subvolume arg is missing name: {entry}')
continue
mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None
@@ -705,7 +704,7 @@ class PartitionModification:
"""
Called for displaying data in table format
"""
- info = {
+ part_mod = {
'Status': self.status.value,
'Device': str(self.dev_path) if self.dev_path else '',
'Type': self.type.value,
@@ -718,9 +717,9 @@ class PartitionModification:
}
if self.btrfs_subvols:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
+ part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
- return info
+ return part_mod
@dataclass
@@ -916,36 +915,36 @@ class LsblkInfo:
@classmethod
def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo:
- info = cls()
+ lsblk_info = cls()
for f in cls.fields():
lsblk_field = _clean_field(f, CleanType.Blockdevice)
data_field = _clean_field(f, CleanType.Dataclass)
val: Any = None
- if isinstance(getattr(info, data_field), Path):
+ if isinstance(getattr(lsblk_info, data_field), Path):
val = Path(blockdevice[lsblk_field])
- elif isinstance(getattr(info, data_field), Size):
+ elif isinstance(getattr(lsblk_info, data_field), Size):
val = Size(blockdevice[lsblk_field], Unit.B)
else:
val = blockdevice[lsblk_field]
- setattr(info, data_field, val)
+ setattr(lsblk_info, data_field, val)
- info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
+ lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
# sometimes lsblk returns 'mountpoints': [null]
- info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt]
+ lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt]
fs_roots = []
- for r in info.fsroots:
+ for r in lsblk_info.fsroots:
if r:
path = Path(r)
# store the fsroot entries without the leading /
fs_roots.append(path.relative_to(path.anchor))
- info.fsroots = fs_roots
+ lsblk_info.fsroots = fs_roots
- return info
+ return lsblk_info
class CleanType(Enum):
@@ -978,16 +977,16 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
try:
result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}')
break
- except SysCallError as error:
+ except SysCallError as err:
# Get the output minus the message/info from lsblk if it returns a non-zero exit code.
- if error.worker:
- err = error.worker.decode('UTF-8')
- log(f'Error calling lsblk: {err}', level=logging.DEBUG)
+ if err.worker:
+ err_str = err.worker.decode('UTF-8')
+ debug(f'Error calling lsblk: {err_str}')
else:
- raise error
+ raise err
if retry_attempt == retry - 1:
- raise error
+ raise err
time.sleep(1)
@@ -997,11 +996,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
blockdevices = block_devices['blockdevices']
return [LsblkInfo.from_json(device) for device in blockdevices]
except json.decoder.JSONDecodeError as err:
- log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
+ error(f"Could not decode lsblk JSON: {result}")
raise err
raise DiskError(f'Failed to read disk "{dev_path}" with lsblk')
+
def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
if infos := _fetch_lsblk_info(dev_path):
return infos[0]
diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py
index 285270fb..8c64e65e 100644
--- a/archinstall/lib/disk/encryption_menu.py
+++ b/archinstall/lib/disk/encryption_menu.py
@@ -13,7 +13,7 @@ from ..menu import (
MenuSelectionType,
TableMenu
)
-from ..user_interaction.utils import get_password
+from ..interactions.utils import get_password
from ..menu import Menu
from ..general import secret
from .fido import Fido2Device, Fido2
diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py
index 2a53b551..97c38d84 100644
--- a/archinstall/lib/disk/fido.py
+++ b/archinstall/lib/disk/fido.py
@@ -1,13 +1,12 @@
from __future__ import annotations
import getpass
-import logging
from pathlib import Path
from typing import List, Optional
from .device_model import PartitionModification, Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
-from ..output import log
+from ..output import error, info
class Fido2:
@@ -39,7 +38,7 @@ class Fido2:
if not cls._loaded or reload:
ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8')
if not ret:
- log('Unable to retrieve fido2 devices', level=logging.ERROR)
+ error('Unable to retrieve fido2 devices')
return []
fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore
@@ -88,8 +87,4 @@ class Fido2:
worker.write(bytes(getpass.getpass(" "), 'UTF-8'))
pin_inputted = True
- log(
- f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.",
- level=logging.INFO,
- fg="yellow"
- )
+ info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds')
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 6ea99340..dc99afce 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import logging
import signal
import sys
import time
@@ -8,8 +7,8 @@ from typing import Any, Optional, TYPE_CHECKING
from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
from .device_handler import device_handler
-from ..hardware import has_uefi
-from ..output import log
+from ..hardware import SysInfo
+from ..output import debug
from ..menu import Menu
if TYPE_CHECKING:
@@ -27,13 +26,13 @@ class FilesystemHandler:
def perform_filesystem_operations(self, show_countdown: bool = True):
if self._disk_config.config_type == DiskLayoutType.Pre_mount:
- log('Disk layout configuration is set to pre-mount, not performing any operations', level=logging.DEBUG)
+ debug('Disk layout configuration is set to pre-mount, not performing any operations')
return
device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications))
if not device_mods:
- log('No modifications required', level=logging.DEBUG)
+ debug('No modifications required')
return
device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods])
@@ -48,7 +47,7 @@ class FilesystemHandler:
# Setup the blockdevice, filesystem (and optionally encryption).
# Once that's done, we'll hand over to perform_installation()
partition_table = PartitionTable.GPT
- if has_uefi() is False:
+ if SysInfo.has_uefi() is False:
partition_table = PartitionTable.MBR
for mod in device_mods:
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
index 686e8c29..89cf6293 100644
--- a/archinstall/lib/disk/partitioning_menu.py
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -1,13 +1,12 @@
from __future__ import annotations
-import logging
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
ModificationStatus
from ..menu import Menu, ListManager, MenuSelection, TextInput
-from ..output import FormattedOutput, log
+from ..output import FormattedOutput, warn
from .subvolume_menu import SubvolumeMenu
if TYPE_CHECKING:
@@ -229,7 +228,7 @@ class PartitioningList(ListManager):
if not start_sector or self._validate_sector(start_sector):
break
- log(f'Invalid start sector entered: {start_sector}', fg='red', level=logging.INFO)
+ warn(f'Invalid start sector entered: {start_sector}')
if not start_sector:
start_sector = str(largest_free_area.start)
@@ -245,7 +244,7 @@ class PartitioningList(ListManager):
if not end_value or self._validate_sector(start_sector, end_value):
break
- log(f'Invalid end sector entered: {start_sector}', fg='red', level=logging.INFO)
+ warn(f'Invalid end sector entered: {start_sector}')
# override the default value with the user value
if end_value:
@@ -300,7 +299,7 @@ class PartitioningList(ListManager):
if choice.value == Menu.no():
return []
- from ..user_interaction.disk_conf import suggest_single_disk_layout
+ from ..interactions.disk_conf import suggest_single_disk_layout
device_modification = suggest_single_disk_layout(self._device)
return device_modification.partitions