Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk.py')
-rw-r--r--archinstall/lib/disk.py43
1 files changed, 24 insertions, 19 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index c099cca1..410bb481 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -2,6 +2,7 @@ import glob
import pathlib
import re
from collections import OrderedDict
+from typing import Optional
from .general import *
from .hardware import hasUEFI
@@ -17,7 +18,8 @@ MBR = 0b00000010
# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
-class BlockDevice():
+
+class BlockDevice:
def __init__(self, path, info=None):
if not info:
# If we don't give any information, we need to auto-fill it.
@@ -76,7 +78,8 @@ class BlockDevice():
if self.info['type'] == 'loop':
for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path: continue
+ if not drive['name'] == self.path:
+ continue
return drive['back-file']
elif self.info['type'] == 'disk':
@@ -91,8 +94,8 @@ class BlockDevice():
else:
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
- # if not stat.S_ISBLK(os.stat(full_path).st_mode):
- # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
+ # if not stat.S_ISBLK(os.stat(full_path).st_mode):
+ # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
def partitions(self):
@@ -153,7 +156,7 @@ class BlockDevice():
self.part_cache = OrderedDict()
-class Partition():
+class Partition:
def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id:
part_id = os.path.basename(path)
@@ -177,11 +180,11 @@ class Partition():
if self.mountpoint != mount_information.get('target', None) and mountpoint:
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
- if (target := mount_information.get('target', None)):
+ if target := mount_information.get('target', None):
self.mountpoint = target
if not self.filesystem and autodetect_filesystem:
- if (fstype := mount_information.get('fstype', get_filesystem_type(path))):
+ if fstype := mount_information.get('fstype', get_filesystem_type(path)):
self.filesystem = fstype
if self.filesystem == 'crypto_LUKS':
@@ -234,9 +237,9 @@ class Partition():
@property
def real_device(self):
for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
- if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
+ if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+ # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
def detect_inner_filesystem(self, password):
@@ -351,9 +354,9 @@ class Partition():
self.filesystem = 'f2fs'
elif filesystem == 'crypto_LUKS':
- # from .luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
+ # from .luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
self.filesystem = 'crypto_LUKS'
else:
@@ -371,14 +374,15 @@ class Partition():
return parent
elif 'children' in data:
for child in data['children']:
- if (parent := self.find_parent_of(child, name, parent=data['name'])):
+ if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
def mount(self, target, fs=None, options=''):
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
+ if not self.filesystem:
+ raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
pathlib.Path(target).mkdir(parents=True, exist_ok=True)
@@ -403,7 +407,7 @@ class Partition():
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
- if exit_code > 0 and exit_code < 8000:
+ if 0 < exit_code < 8000:
raise err
self.mountpoint = None
@@ -416,8 +420,8 @@ class Partition():
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
- 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
- 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
+ 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
+ 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
@@ -428,7 +432,7 @@ class Partition():
return True
-class Filesystem():
+class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
@@ -569,7 +573,8 @@ def all_disks(*args, **kwargs):
drives = OrderedDict()
# for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
for drive in json.loads(b''.join(sys_command('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part': continue
+ if not kwargs['partitions'] and drive['type'] == 'part':
+ continue
drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives