Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/device_model.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/device_model.py')
-rw-r--r--archinstall/lib/disk/device_model.py50
1 files changed, 25 insertions, 25 deletions
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index d57347b7..36dd0c4f 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -2,7 +2,6 @@ from __future__ import annotations
import dataclasses
import json
-import logging
import math
import time
import uuid
@@ -18,7 +17,7 @@ from parted import Disk, Geometry, Partition
from ..exceptions import DiskError, SysCallError
from ..general import SysCommand
-from ..output import log
+from ..output import debug, error
from ..storage import storage
if TYPE_CHECKING:
@@ -282,7 +281,7 @@ class _PartitionInfo:
btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list)
def as_json(self) -> Dict[str, Any]:
- info = {
+ part_info = {
'Name': self.name,
'Type': self.type.value,
'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')),
@@ -293,9 +292,9 @@ class _PartitionInfo:
}
if self.btrfs_subvol_infos:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
+ part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
- return info
+ return part_info
@classmethod
def from_partition(
@@ -392,7 +391,7 @@ class SubvolumeModification:
mods = []
for entry in subvol_args:
if not entry.get('name', None) or not entry.get('mountpoint', None):
- log(f'Subvolume arg is missing name: {entry}', level=logging.DEBUG)
+ debug(f'Subvolume arg is missing name: {entry}')
continue
mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None
@@ -705,7 +704,7 @@ class PartitionModification:
"""
Called for displaying data in table format
"""
- info = {
+ part_mod = {
'Status': self.status.value,
'Device': str(self.dev_path) if self.dev_path else '',
'Type': self.type.value,
@@ -718,9 +717,9 @@ class PartitionModification:
}
if self.btrfs_subvols:
- info['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
+ part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
- return info
+ return part_mod
@dataclass
@@ -916,36 +915,36 @@ class LsblkInfo:
@classmethod
def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo:
- info = cls()
+ lsblk_info = cls()
for f in cls.fields():
lsblk_field = _clean_field(f, CleanType.Blockdevice)
data_field = _clean_field(f, CleanType.Dataclass)
val: Any = None
- if isinstance(getattr(info, data_field), Path):
+ if isinstance(getattr(lsblk_info, data_field), Path):
val = Path(blockdevice[lsblk_field])
- elif isinstance(getattr(info, data_field), Size):
+ elif isinstance(getattr(lsblk_info, data_field), Size):
val = Size(blockdevice[lsblk_field], Unit.B)
else:
val = blockdevice[lsblk_field]
- setattr(info, data_field, val)
+ setattr(lsblk_info, data_field, val)
- info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
+ lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
# sometimes lsblk returns 'mountpoints': [null]
- info.mountpoints = [Path(mnt) for mnt in info.mountpoints if mnt]
+ lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt]
fs_roots = []
- for r in info.fsroots:
+ for r in lsblk_info.fsroots:
if r:
path = Path(r)
# store the fsroot entries without the leading /
fs_roots.append(path.relative_to(path.anchor))
- info.fsroots = fs_roots
+ lsblk_info.fsroots = fs_roots
- return info
+ return lsblk_info
class CleanType(Enum):
@@ -978,16 +977,16 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
try:
result = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}')
break
- except SysCallError as error:
+ except SysCallError as err:
# Get the output minus the message/info from lsblk if it returns a non-zero exit code.
- if error.worker:
- err = error.worker.decode('UTF-8')
- log(f'Error calling lsblk: {err}', level=logging.DEBUG)
+ if err.worker:
+ err_str = err.worker.decode('UTF-8')
+ debug(f'Error calling lsblk: {err_str}')
else:
- raise error
+ raise err
if retry_attempt == retry - 1:
- raise error
+ raise err
time.sleep(1)
@@ -997,11 +996,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None, retry: int =
blockdevices = block_devices['blockdevices']
return [LsblkInfo.from_json(device) for device in blockdevices]
except json.decoder.JSONDecodeError as err:
- log(f"Could not decode lsblk JSON: {result}", fg="red", level=logging.ERROR)
+ error(f"Could not decode lsblk JSON: {result}")
raise err
raise DiskError(f'Failed to read disk "{dev_path}" with lsblk')
+
def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
if infos := _fetch_lsblk_info(dev_path):
return infos[0]