Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk/blockdevice.py64
-rw-r--r--archinstall/lib/disk/btrfs.py18
-rw-r--r--archinstall/lib/disk/filesystem.py44
-rw-r--r--archinstall/lib/disk/helpers.py47
-rw-r--r--archinstall/lib/disk/partition.py94
-rw-r--r--archinstall/lib/disk/user_guides.py16
-rw-r--r--archinstall/lib/disk/validators.py6
-rw-r--r--archinstall/lib/exceptions.py2
-rw-r--r--archinstall/lib/general.py45
-rw-r--r--archinstall/lib/installer.py139
-rw-r--r--archinstall/lib/locale_helpers.py15
-rw-r--r--archinstall/lib/luks.py41
-rw-r--r--archinstall/lib/mirrors.py15
-rw-r--r--archinstall/lib/networking.py27
-rw-r--r--archinstall/lib/packages.py9
-rw-r--r--archinstall/lib/plugins.py7
-rw-r--r--archinstall/lib/profiles.py69
-rw-r--r--archinstall/lib/services.py2
-rw-r--r--archinstall/lib/systemd.py19
-rw-r--r--archinstall/lib/user_interaction.py106
20 files changed, 459 insertions, 326 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 2be31375..5288f92b 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -1,14 +1,20 @@
+from __future__ import annotations
import os
import json
import logging
import time
+from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .partition import Partition
+
from ..exceptions import DiskError
from ..output import log
from ..general import SysCommand
from ..storage import storage
class BlockDevice:
- def __init__(self, path, info=None):
+ def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
from .helpers import all_disks
# If we don't give any information, we need to auto-fill it.
@@ -24,32 +30,32 @@ class BlockDevice:
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
- def __repr__(self, *args, **kwargs):
+ def __repr__(self, *args :str, **kwargs :str) -> str:
return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
- def __iter__(self):
+ def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
yield self.partitions[partition]
- def __getitem__(self, key, *args, **kwargs):
+ def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
if key not in self.info:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
- def __len__(self):
+ def __len__(self) -> int:
return len(self.partitions)
- def __lt__(self, left_comparitor):
+ def __lt__(self, left_comparitor :'BlockDevice') -> bool:
return self.path < left_comparitor.path
- def json(self):
+ def json(self) -> str:
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
return self.path
- def __dump__(self):
+ def __dump__(self) -> Dict[str, Dict[str, Any]]:
return {
self.path : {
'partuuid' : self.uuid,
@@ -59,14 +65,14 @@ class BlockDevice:
}
@property
- def partition_type(self):
+ def partition_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['pttype']
@property
- def device_or_backfile(self):
+ def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
@@ -82,7 +88,7 @@ class BlockDevice:
return self.device
@property
- def device(self):
+ def device(self) -> str:
"""
Returns the device file of the BlockDevice.
If it's a loop-back-device it returns the /dev/X device,
@@ -108,7 +114,7 @@ class BlockDevice:
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
- def partitions(self):
+ def partitions(self) -> Dict[str, Partition]:
from .filesystem import Partition
self.partprobe()
@@ -133,17 +139,19 @@ class BlockDevice:
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
@property
- def partition(self):
+ def partition(self) -> Partition:
all_partitions = self.partitions
return [all_partitions[k] for k in all_partitions]
@property
- def partition_table_type(self):
+ def partition_table_type(self) -> int:
+ # TODO: Don't hardcode :)
+ # Remove if we don't use this function anywhere
from .filesystem import GPT
return GPT
@property
- def uuid(self):
+ def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
@@ -153,7 +161,7 @@ class BlockDevice:
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
@property
- def size(self):
+ def size(self) -> float:
from .helpers import convert_size_to_gb
output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
@@ -162,21 +170,21 @@ class BlockDevice:
return convert_size_to_gb(device['size'])
@property
- def bus_type(self):
+ def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
@property
- def spinning(self):
+ def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
@property
- def free_space(self):
+ def free_space(self) -> Tuple[str, str, str]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@@ -187,7 +195,7 @@ class BlockDevice:
yield (start, end, size)
@property
- def largest_free_space(self):
+ def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
if not info:
@@ -199,7 +207,7 @@ class BlockDevice:
return info
@property
- def first_free_sector(self):
+ def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
else:
@@ -207,29 +215,29 @@ class BlockDevice:
return start
@property
- def first_end_sector(self):
+ def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
else:
end = f"{self.size}GB"
return end
- def partprobe(self):
- SysCommand(['partprobe', self.path])
+ def partprobe(self) -> bool:
+ return SysCommand(['partprobe', self.path]).exit_code == 0
- def has_partitions(self):
+ def has_partitions(self) -> int:
return len(self.partitions)
- def has_mount_point(self, mountpoint):
+ def has_mount_point(self, mountpoint :str) -> bool:
for partition in self.partitions:
if self.partitions[partition].mountpoint == mountpoint:
return True
return False
- def flush_cache(self):
+ def flush_cache(self) -> None:
self.part_cache = {}
- def get_partition(self, uuid):
+ def get_partition(self, uuid :str) -> Partition:
count = 0
while count < 5:
for partition_uuid, partition in self.partitions.items():
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index fb9712f8..084e85d2 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -1,7 +1,12 @@
+from __future__ import annotations
import pathlib
import glob
import logging
-from typing import Union
+from typing import Union, Dict, TYPE_CHECKING
+
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from ..installer import Installer
from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
@@ -9,7 +14,7 @@ from ..output import log
from .partition import Partition
-def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
+def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
@@ -42,7 +47,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str],
return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0
-def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -75,7 +80,12 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None):
+def manage_btrfs_subvolumes(installation :Installer,
+ partition :Dict[str, str],
+ mountpoints :Dict[str, str],
+ subvolumes :Dict[str, str],
+ unlocked_device :Dict[str, str] = None
+) -> None:
""" we do the magic with subvolumes in a centralized place
parameters:
* the installation object
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 51ef949b..e6e965f1 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,7 +1,13 @@
+from __future__ import annotations
import time
import logging
import json
import pathlib
+from typing import Optional, Dict, Any, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .blockdevice import BlockDevice
+
from .partition import Partition
from .validators import valid_fs_type
from ..exceptions import DiskError
@@ -16,24 +22,25 @@ class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
- def __init__(self, blockdevice, mode):
+ def __init__(self, blockdevice :BlockDevice, mode :int):
self.blockdevice = blockdevice
self.mode = mode
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
return self
- def __repr__(self):
+ def __repr__(self) -> str:
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
+
SysCommand('sync')
return True
- def partuuid_to_index(self, uuid):
+ def partuuid_to_index(self, uuid :str) -> Optional[int]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(5)
@@ -50,7 +57,7 @@ class Filesystem:
raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
- def load_layout(self, layout :dict):
+ def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
# If the layout tells us to wipe the drive, we do so
@@ -127,21 +134,21 @@ class Filesystem:
log(f"Marking partition {partition['device_instance']} as bootable.")
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
- def find_partition(self, mountpoint):
+ def find_partition(self, mountpoint :str) -> Partition:
for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
+ def partprobe(self) -> bool:
+ return SysCommand(f'bash -c "partprobe"').exit_code == 0
- def raw_parted(self, string: str):
+ def raw_parted(self, string: str) -> SysCommand:
if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
time.sleep(0.5)
return cmd_handle
- def parted(self, string: str):
+ def parted(self, string: str) -> bool:
"""
Performs a parted execution of the given string
@@ -149,16 +156,17 @@ class Filesystem:
:type string: str
"""
if (parted_handle := self.raw_parted(string)).exit_code == 0:
- self.partprobe()
- return True
+ if self.partprobe():
+ return True
+ return False
else:
raise DiskError(f"Parted failed to add a partition: {parted_handle}")
- def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
+ def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type, start, end, partition_format=None):
+ def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
@@ -197,14 +205,14 @@ class Filesystem:
log("Add partition is exiting due to excessive wait time",level=logging.INFO)
raise DiskError(f"New partition never showed up after adding new partition on {self}.")
- def set_name(self, partition: int, name: str):
+ def set_name(self, partition: int, name: str) -> bool:
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
- def set(self, partition: int, string: str):
+ def set(self, partition: int, string: str) -> bool:
log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
- def parted_mklabel(self, device: str, disk_label: str):
+ def parted_mklabel(self, device: str, disk_label: str) -> bool:
log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
try:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index ba29744f..e9f6bc10 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -1,10 +1,15 @@
+from __future__ import annotations
import json
import logging
import os
import pathlib
import re
import time
-from typing import Union
+from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .partition import Partition
+
from .blockdevice import BlockDevice
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
@@ -14,10 +19,10 @@ from ..storage import storage
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GIGA = 2 ** 30
-def convert_size_to_gb(size):
+def convert_size_to_gb(size :Union[int, float]) -> float:
return round(size / GIGA,1)
-def sort_block_devices_based_on_performance(block_devices):
+def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]:
result = {device: 0 for device in block_devices}
for device, weight in result.items():
@@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices):
return result
-def filter_disks_below_size_in_gb(devices, gigabytes):
+def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]:
for disk in devices:
if disk.size >= gigabytes:
yield disk
-def select_largest_device(devices, gigabytes, filter_out=None):
+def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None):
return max(copy_devices, key=(lambda device : device.size))
-def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
+def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
-def convert_to_gigabytes(string):
+def convert_to_gigabytes(string :str) -> float:
unit = string.strip()[-1]
size = float(string.strip()[:-1])
@@ -81,7 +86,7 @@ def convert_to_gigabytes(string):
return size
-def device_state(name, *args, **kwargs):
+def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
@@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs):
return True
# lsblk --json -l -n -o path
-def all_disks(*args, **kwargs):
+def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
kwargs.setdefault("partitions", False)
drives = {}
@@ -113,7 +118,7 @@ def all_disks(*args, **kwargs):
return drives
-def harddrive(size=None, model=None, fuzzy=False):
+def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
collection = all_disks()
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
@@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
-def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict:
+def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
device_path,bind_path = split_bind_name(path)
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
@@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p
return {}
-def get_partitions_in_use(mountpoint) -> list:
+def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
@@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list:
return mounts
-def get_filesystem_type(path):
+def get_filesystem_type(path :str) -> Optional[str]:
device_name, bind_name = split_bind_name(path)
try:
return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
@@ -201,10 +206,10 @@ def get_filesystem_type(path):
return None
-def disk_layouts():
+def disk_layouts() -> Optional[Dict[str, Any]]:
try:
if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0:
- return json.loads(handle.decode('UTF-8'))
+ return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()}
else:
log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow")
return None
@@ -216,20 +221,22 @@ def disk_layouts():
return None
-def encrypted_partitions(blockdevices :dict) -> bool:
+def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
for partition in blockdevices.values():
if partition.get('encrypted', False):
yield partition
-def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
+def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
for partition in block_devices[device]['partitions']:
if partition.get('mountpoint', None) == relative_mountpoint:
return partition
-def partprobe():
- SysCommand(f'bash -c "partprobe"')
- time.sleep(5)
+def partprobe() -> bool:
+ if SysCommand(f'bash -c "partprobe"').exit_code == 0:
+ time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk
+ return True
+ return False
def convert_device_to_uuid(path :str) -> str:
device_name, bind_name = split_bind_name(path)
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index bb6f2d53..b8fa2b79 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,7 +5,8 @@ import logging
import json
import os
import hashlib
-from typing import Optional
+from typing import Optional, Dict, Any, List, Union
+
from .blockdevice import BlockDevice
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
@@ -15,7 +16,15 @@ from ..general import SysCommand
class Partition:
- def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
+ def __init__(self,
+ path: str,
+ block_device: BlockDevice,
+ part_id :Optional[str] = None,
+ filesystem :Optional[str] = None,
+ mountpoint :Optional[str] = None,
+ encrypted :bool = False,
+ autodetect_filesystem :bool = True):
+
if not part_id:
part_id = os.path.basename(path)
@@ -50,14 +59,16 @@ class Partition:
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
- def __lt__(self, left_comparitor):
+ def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
else:
left_comparitor = str(left_comparitor)
- return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
- def __repr__(self, *args, **kwargs):
+ # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
+ return self.path < left_comparitor
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
@@ -69,7 +80,7 @@ class Partition:
else:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
- def __dump__(self):
+ def __dump__(self) -> Dict[str, Any]:
return {
'type': 'primary',
'PARTUUID': self._safe_uuid,
@@ -86,14 +97,14 @@ class Partition:
}
@property
- def sector_size(self):
+ def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
- def start(self):
+ def start(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
@@ -101,7 +112,7 @@ class Partition:
return partition['start'] # * self.sector_size
@property
- def end(self):
+ def end(self) -> Optional[str]:
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
@@ -110,7 +121,7 @@ class Partition:
return partition['size'] # * self.sector_size
@property
- def size(self):
+ def size(self) -> Optional[float]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
@@ -123,7 +134,7 @@ class Partition:
time.sleep(storage['DISK_TIMEOUTS'])
@property
- def boot(self):
+ def boot(self) -> bool:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
# Get the bootable flag from the sfdisk output:
@@ -143,7 +154,7 @@ class Partition:
return False
@property
- def partition_type(self):
+ def partition_type(self) -> Optional[str]:
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
@@ -179,19 +190,19 @@ class Partition:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
@property
- def encrypted(self):
+ def encrypted(self) -> Union[bool, None]:
return self._encrypted
@encrypted.setter
- def encrypted(self, value: bool):
+ def encrypted(self, value: bool) -> None:
self._encrypted = value
@property
- def parent(self):
+ def parent(self) -> str:
return self.real_device
@property
- def real_device(self):
+ def real_device(self) -> str:
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
@@ -199,25 +210,27 @@ class Partition:
return self.path
@property
- def device_path(self):
+ def device_path(self) -> str:
""" for bind mounts returns the phisical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
return device_path
@property
- def bind_name(self):
+ def bind_name(self) -> str:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
return bind_name
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
- time.sleep(1)
+ def partprobe(self) -> bool:
+ if SysCommand(f'bash -c "partprobe"').exit_code == 0:
+ time.sleep(1)
+ return True
+ return False
- def detect_inner_filesystem(self, password):
+ def detect_inner_filesystem(self, password :str) -> Optional[str]:
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from ..luks import luks2
@@ -227,7 +240,7 @@ class Partition:
except SysCallError:
return None
- def has_content(self):
+ def has_content(self) -> bool:
fs_type = get_filesystem_type(self.path)
if not fs_type or "swap" in fs_type:
return False
@@ -248,7 +261,7 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args, **kwargs):
+ def encrypt(self, *args :str, **kwargs :str) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
@@ -257,7 +270,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, log_formatting=True, options=[]):
+ def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -342,7 +355,7 @@ class Partition:
return True
- def find_parent_of(self, data, name, parent=None):
+ def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
if data['name'] == name:
return parent
elif 'children' in data:
@@ -350,7 +363,7 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
- def mount(self, target, fs=None, options=''):
+ def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
@@ -386,25 +399,24 @@ class Partition:
self.mountpoint = target
return True
- def unmount(self):
- try:
- SysCommand(f"/usr/bin/umount {self.path}")
- except SysCallError as err:
- exit_code = err.exit_code
-
- # Without to much research, it seams that low error codes are errors.
- # And above 8k is indicators such as "/dev/x not mounted.".
- # So anything in between 0 and 8k are errors (?).
- if 0 < exit_code < 8000:
- raise err
+ return False
+
+ def unmount(self) -> bool:
+ worker = SysCommand(f"/usr/bin/umount {self.path}")
+
+ # Without to much research, it seams that low error codes are errors.
+ # And above 8k is indicators such as "/dev/x not mounted.".
+ # So anything in between 0 and 8k are errors (?).
+ if 0 < worker.exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
self.mountpoint = None
return True
- def umount(self):
+ def umount(self) -> bool:
return self.unmount()
- def filesystem_supported(self):
+ def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
@@ -420,7 +432,7 @@ class Partition:
return True
-def get_mount_fs_type(fs):
+def get_mount_fs_type(fs :str) -> str:
if fs == 'ntfs':
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
elif fs == 'fat32':
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 3d48c104..b0a8fe8a 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -1,8 +1,17 @@
+from __future__ import annotations
import logging
+from typing import Optional, Dict, Any, List, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .blockdevice import BlockDevice
+
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..output import log
-def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False):
+def suggest_single_disk_layout(block_device :BlockDevice,
+ default_filesystem :Optional[str] = None,
+ advanced_options :bool = False) -> Dict[str, Any]:
+
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)
@@ -94,7 +103,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o
return layout
-def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False):
+def suggest_multi_disk_layout(block_devices :List[BlockDevice],
+ default_filesystem :Optional[str] = None,
+ advanced_options :bool = False) -> Dict[str, Any]:
+
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index 464f0d73..fd1b7f33 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -1,4 +1,6 @@
-def valid_parted_position(pos :str):
+from typing import List
+
+def valid_parted_position(pos :str) -> bool:
if not len(pos):
return False
@@ -17,7 +19,7 @@ def valid_parted_position(pos :str):
return False
-def fs_types():
+def fs_types() -> List[str]:
# https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# Above link doesn't agree with `man parted` /mkpart documentation:
"""
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index 6cf00026..783bc9c5 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -17,7 +17,7 @@ class ProfileError(BaseException):
class SysCallError(BaseException):
- def __init__(self, message :str, exit_code :Optional[int]) -> None:
+ def __init__(self, message :str, exit_code :Optional[int] = None) -> None:
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index cc50e80a..96c9d50c 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,3 +1,4 @@
+from __future__ import annotations
import hashlib
import json
import logging
@@ -9,7 +10,10 @@ import string
import sys
import time
from datetime import datetime, date
-from typing import Callable, Optional, Dict, Any, List, Union, Iterator
+from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .installer import Installer
if sys.platform == 'linux':
from select import epoll, EPOLLIN, EPOLLHUP
@@ -46,14 +50,14 @@ from .exceptions import RequirementError, SysCallError
from .output import log
from .storage import storage
-def gen_uid(entropy_length=256):
+def gen_uid(entropy_length :int = 256) -> str:
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
-def generate_password(length=64):
+def generate_password(length :int = 64) -> str:
haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace
return ''.join(secrets.choice(haystack) for i in range(length))
-def multisplit(s, splitters):
+def multisplit(s :str, splitters :List[str]) -> str:
s = [s, ]
for key in splitters:
ns = []
@@ -77,12 +81,12 @@ def locate_binary(name :str) -> str:
raise RequirementError(f"Binary {name} does not exist.")
-def json_dumps(*args, **kwargs):
+def json_dumps(*args :str, **kwargs :str) -> str:
return json.dumps(*args, **{**kwargs, 'cls': JSON})
class JsonEncoder:
@staticmethod
- def _encode(obj):
+ def _encode(obj :Any) -> Any:
"""
This JSON encoder function will try it's best to convert
any archinstall data structures, instances or variables into
@@ -119,7 +123,7 @@ class JsonEncoder:
return obj
@staticmethod
- def _unsafe_encode(obj):
+ def _unsafe_encode(obj :Any) -> Any:
"""
Same as _encode() but it keeps dictionary keys starting with !
"""
@@ -141,20 +145,20 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
"""
A safe JSON encoder that will omit private information in dicts (starting with !)
"""
- def _encode(self, obj):
+ def _encode(self, obj :Any) -> Any:
return JsonEncoder._encode(obj)
- def encode(self, obj):
+ def encode(self, obj :Any) -> Any:
return super(JSON, self).encode(self._encode(obj))
class UNSAFE_JSON(json.JSONEncoder, json.JSONDecoder):
"""
UNSAFE_JSON will call/encode and keep private information in dicts (starting with !)
"""
- def _encode(self, obj):
+ def _encode(self, obj :Any) -> Any:
return JsonEncoder._unsafe_encode(obj)
- def encode(self, obj):
+ def encode(self, obj :Any) -> Any:
return super(UNSAFE_JSON, self).encode(self._encode(obj))
class SysCommandWorker:
@@ -455,12 +459,16 @@ class SysCommand:
return None
-def prerequisite_check():
- if not os.path.isdir("/sys/firmware/efi"):
- raise RequirementError("Archinstall only supports machines in UEFI mode.")
+def prerequisite_check() -> bool:
+ """
+ This function is used as a safety check before
+ continuing with an installation.
- return True
+ Could be anything from checking that /boot is big enough
+ to check if nvidia hardware exists when nvidia driver was chosen.
+ """
+ return True
def reboot():
SysCommand("/usr/bin/reboot")
@@ -473,12 +481,15 @@ def pid_exists(pid: int) -> bool:
return False
-def run_custom_user_commands(commands, installation):
+def run_custom_user_commands(commands :List[str], installation :Installer) -> None:
for index, command in enumerate(commands):
- log(f'Executing custom command "{command}" ...', fg='yellow')
+ log(f'Executing custom command "{command}" ...', level=logging.INFO)
+
with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
temp_script.write(command)
+
execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh")
+
log(execution_output)
os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index c02d5717..4f46e458 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,5 +1,4 @@
import time
-from typing import Union
import logging
import os
import shutil
@@ -7,6 +6,7 @@ import shlex
import pathlib
import subprocess
import glob
+from typing import Union, Dict, Any, List, ModuleType, Optional, Iterator, Mapping
from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
@@ -30,29 +30,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
class InstallationFile:
- def __init__(self, installation, filename, owner, mode="w"):
+ def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
self.installation = installation
self.filename = filename
self.owner = owner
self.mode = mode
self.fh = None
- def __enter__(self):
+ def __enter__(self) -> 'InstallationFile':
self.fh = open(self.filename, self.mode)
return self
- def __exit__(self, *args):
+ def __exit__(self, *args :str) -> None:
self.fh.close()
self.installation.chown(self.owner, self.filename)
- def write(self, data: Union[str, bytes]):
+ def write(self, data: Union[str, bytes]) -> int:
return self.fh.write(data)
- def read(self, *args):
+ def read(self, *args) -> Union[str, bytes]:
return self.fh.read(*args)
- def poll(self, *args):
- return self.fh.poll(*args)
+# def poll(self, *args) -> bool:
+# return self.fh.poll(*args)
def accessibility_tools_in_use() -> bool:
@@ -84,11 +84,12 @@ class Installer:
"""
- def __init__(self, target, *, base_packages=None, kernels=None):
+ def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
if base_packages is None:
base_packages = __packages__[:3]
if kernels is None:
kernels = ['linux']
+
self.kernels = kernels
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
@@ -119,18 +120,17 @@ class Installer:
self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
- def log(self, *args, level=logging.DEBUG, **kwargs):
+ def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
"""
log(*args, level=level, **kwargs)
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
return self
- def __exit__(self, *args, **kwargs):
- # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@@ -163,10 +163,10 @@ class Installer:
return False
@property
- def partitions(self):
+ def partitions(self) -> List[Partition]:
return get_partitions_in_use(self.target)
- def sync_log_to_install_medium(self):
+ def sync_log_to_install_medium(self) -> bool:
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True:
@@ -180,7 +180,7 @@ class Installer:
return True
- def mount_ordered_layout(self, layouts: dict):
+ def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
from .luks import luks2
mountpoints = {}
@@ -254,16 +254,16 @@ class Installer:
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
- def mount(self, partition, mountpoint, create_mountpoint=True):
+ def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None:
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
partition.mount(f'{self.target}{mountpoint}')
- def post_install_check(self, *args, **kwargs):
+ def post_install_check(self, *args :str, **kwargs :str) -> List[bool]:
return [step for step, flag in self.helper_flags.items() if flag is False]
- def pacstrap(self, *packages, **kwargs):
+ def pacstrap(self, *packages :str, **kwargs :str) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@@ -284,7 +284,7 @@ class Installer:
else:
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
- def set_mirrors(self, mirrors):
+ def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors):
@@ -292,7 +292,7 @@ class Installer:
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
- def genfstab(self, flags='-pU'):
+ def genfstab(self, flags :str = '-pU') -> bool:
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
@@ -307,11 +307,11 @@ class Installer:
return True
- def set_hostname(self, hostname: str, *args, **kwargs):
+ def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
- def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
+ def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool:
if not len(locale):
return True
@@ -322,7 +322,7 @@ class Installer:
return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
- def set_timezone(self, zone, *args, **kwargs):
+ def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
if not zone:
return True
if not len(zone):
@@ -337,6 +337,7 @@ class Installer:
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
+
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
@@ -344,11 +345,13 @@ class Installer:
fg='red'
)
- def activate_ntp(self):
+ return False
+
+ def activate_ntp(self) -> None:
log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
self.activate_time_syncronization()
- def activate_time_syncronization(self):
+ def activate_time_syncronization(self) -> None:
self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
self.enable_service('systemd-timesyncd')
@@ -361,11 +364,11 @@ class Installer:
with Boot(self) as session:
session.SysCommand(["timedatectl", "set-ntp", 'true'])
- def enable_espeakup(self):
+ def enable_espeakup(self) -> None:
self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
self.enable_service('espeakup')
- def enable_service(self, *services):
+ def enable_service(self, *services :str) -> None:
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
@@ -375,19 +378,27 @@ class Installer:
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
- def run_command(self, cmd, *args, **kwargs):
+ def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
- def arch_chroot(self, cmd, run_as=None):
+ def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
return self.run_command(cmd)
- def drop_to_shell(self):
+ def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
- def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
+ def configure_nic(self,
+ nic :str,
+ dhcp :bool = True,
+ ip :Optional[str] = None,
+ gateway :Optional[str] = None,
+ dns :Optional[str] = None,
+ *args :str,
+ **kwargs :str
+ ) -> None:
from .systemd import Networkd
if dhcp:
@@ -412,7 +423,7 @@ class Installer:
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
- def copy_iso_network_config(self, enable_services=False):
+ def copy_iso_network_config(self, enable_services :bool = False) -> bool:
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if psk_files := glob.glob('/var/lib/iwd/*.psk'):
@@ -427,7 +438,7 @@ class Installer:
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
- def post_install_enable_iwd_service(*args, **kwargs):
+ def post_install_enable_iwd_service(*args :str, **kwargs :str):
self.enable_service('iwd')
self.post_base_install.append(post_install_enable_iwd_service)
@@ -452,7 +463,7 @@ class Installer:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- def post_install_enable_networkd_resolved(*args, **kwargs):
+ def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
@@ -462,7 +473,7 @@ class Installer:
return True
- def detect_encryption(self, partition):
+ def detect_encryption(self, partition :Partition) -> bool:
part = Partition(partition.parent, None, autodetect_filesystem=True)
if partition.encrypted:
return partition
@@ -471,7 +482,7 @@ class Installer:
return False
- def mkinitcpio(self, *flags):
+ def mkinitcpio(self, *flags :str) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
# Allow plugins to override the usage of mkinitcpio altogether.
@@ -483,9 +494,10 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
- SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
- def minimal_installation(self):
+ return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
+
+ def minimal_installation(self) -> bool:
# Add necessary packages if encrypting the drive
# (encrypted partitions default to btrfs for now, so we need btrfs-progs)
# TODO: Perhaps this should be living in the function which dictates
@@ -562,7 +574,7 @@ class Installer:
return True
- def setup_swap(self, kind='zram'):
+ def setup_swap(self, kind :str = 'zram') -> bool:
if kind == 'zram':
self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator')
@@ -578,7 +590,18 @@ class Installer:
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
- def add_bootloader(self, bootloader='systemd-bootctl'):
+ def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
+ """
+ Adds a bootloader to the installation instance.
+ Archinstall supports one of three types:
+ * systemd-bootctl
+ * grub
+ * efistub (beta)
+
+ :param bootloader: Can be one of the three strings
+ 'systemd-bootctl', 'grub' or 'efistub' (beta)
+ """
+
for plugin in plugins.values():
if hasattr(plugin, 'on_add_bootloader'):
# Allow plugins to override the boot-loader handling.
@@ -757,10 +780,19 @@ class Installer:
return True
- def add_additional_packages(self, *packages):
+ def add_additional_packages(self, *packages :str) -> bool:
return self.pacstrap(*packages)
- def install_profile(self, profile):
+ def install_profile(self, profile :str) -> ModuleType:
+ """
+ Installs a archinstall profile script (.py file).
+ This profile can be either local, remote or part of the library.
+
+ :param profile: Can be a local path or a remote path (URL)
+ :return: Returns the imported script as a module, this way
+ you can access any remaining functions exposed by the profile.
+ :rtype: module
+ """
storage['installation_session'] = self
if type(profile) == str:
@@ -769,13 +801,13 @@ class Installer:
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
- def enable_sudo(self, entity: str, group=False):
+ def enable_sudo(self, entity: str, group :bool = False) -> bool:
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
- def user_create(self, user: str, password=None, groups=None, sudo=False):
+ def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None:
if groups is None:
groups = []
@@ -789,7 +821,8 @@ class Installer:
if not handled_by_plugin:
self.log(f'Creating user {user}', level=logging.INFO)
- SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
+ if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0:
+ raise SystemError(f"Could not create user inside installation: {output}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -806,24 +839,24 @@ class Installer:
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
- def user_set_pw(self, user, password):
+ def user_set_pw(self, user :str, password :str) -> bool:
self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0
- def user_set_shell(self, user, shell):
+ def user_set_shell(self, user :str, shell :str) -> bool:
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
- def chown(self, owner, path, options=[]):
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}")
+ def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0
- def create_file(self, filename, owner=None):
+ def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
def set_keyboard_language(self, language: str) -> bool:
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index ad85ea1b..6aa678a6 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -1,41 +1,42 @@
import logging
+from typing import Iterator
from .exceptions import ServiceException
from .general import SysCommand
from .output import log
-def list_keyboard_languages():
+def list_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
-def list_x11_keyboard_languages():
+def list_x11_keyboard_languages() -> Iterator[str]:
for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
-def verify_keyboard_layout(layout):
+def verify_keyboard_layout(layout :str) -> bool:
for language in list_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
-def verify_x11_keyboard_layout(layout):
+def verify_x11_keyboard_layout(layout :str) -> bool:
for language in list_x11_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
-def search_keyboard_layout(layout):
+def search_keyboard_layout(layout :str) -> Iterator[str]:
for language in list_keyboard_languages():
if layout.lower() in language.lower():
yield language
-def set_keyboard_language(locale):
+def set_keyboard_language(locale :str) -> bool:
if len(locale.strip()):
if not verify_keyboard_layout(locale):
log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
@@ -49,6 +50,6 @@ def set_keyboard_language(locale):
return False
-def list_timezones():
+def list_timezones() -> Iterator[str]:
for line in SysCommand("timedatectl --no-pager list-timezones", environment_vars={'SYSTEMD_COLORS': '0'}):
yield line.decode('UTF-8').strip()
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 255c75d9..26f2bc1b 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,9 +1,15 @@
+from __future__ import annotations
import json
import logging
import os
import pathlib
import shlex
import time
+from typing import Optional, List,TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .installer import Installer
+
from .disk import Partition, convert_device_to_uuid
from .general import SysCommand, SysCommandWorker
from .output import log
@@ -11,7 +17,15 @@ from .exceptions import SysCallError, DiskError
from .storage import storage
class luks2:
- def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
+ def __init__(self,
+ partition :Partition,
+ mountpoint :str,
+ password :str,
+ key_file :Optional[str] = None,
+ auto_unmount :bool = False,
+ *args :str,
+ **kwargs :str):
+
self.password = password
self.partition = partition
self.mountpoint = mountpoint
@@ -22,7 +36,7 @@ class luks2:
self.filesystem = 'crypto_LUKS'
self.mapdev = None
- def __enter__(self):
+ def __enter__(self) -> Partition:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
@@ -34,16 +48,23 @@ class luks2:
return self.unlock(self.partition, self.mountpoint, self.key_file)
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if self.auto_unmount:
self.close()
if len(args) >= 2 and args[1]:
raise args[1]
+
return True
- def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
+ def encrypt(self, partition :Partition,
+ password :Optional[str] = None,
+ key_size :int = 512,
+ hash_type :str = 'sha512',
+ iter_time :int = 10000,
+ key_file :Optional[str] = None) -> str:
+
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
@@ -119,7 +140,7 @@ class luks2:
return key_file
- def unlock(self, partition, mountpoint, key_file):
+ def unlock(self, partition :Partition, mountpoint :str, key_file :str) -> Partition:
"""
Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
@@ -142,18 +163,18 @@ class luks2:
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
return unlocked_partition
- def close(self, mountpoint=None):
+ def close(self, mountpoint :Optional[str] = None) -> bool:
if not mountpoint:
mountpoint = self.mapdev
SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False
- def format(self, path):
+ def format(self, path :str) -> None:
if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
- def add_key(self, path :pathlib.Path, password :str):
+ def add_key(self, path :pathlib.Path, password :str) -> bool:
if not path.exists():
raise OSError(2, f"Could not import {path} as a disk encryption key, file is missing.", str(path))
@@ -169,7 +190,9 @@ class luks2:
if worker.exit_code != 0:
raise DiskError(f'Could not add encryption key {path} to {self.partition} because: {worker}')
- def crypttab(self, installation, key_path :str, options=["luks", "key-slot=1"]):
+ return True
+
+ def crypttab(self, installation :Installer, key_path :str, options :List[str] = ["luks", "key-slot=1"]) -> None:
log(f'Adding a crypttab entry for key {key_path} in {installation}', level=logging.INFO)
with open(f"{installation.target}/etc/crypttab", "a") as crypttab:
crypttab.write(f"{self.mountpoint} UUID={convert_device_to_uuid(self.partition.path)} {key_path} {','.join(options)}\n")
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index 5fad6cb6..6b6bfed4 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,7 +1,7 @@
import logging
import urllib.error
import urllib.request
-from typing import Union, Mapping, Iterable
+from typing import Union, Mapping, Iterable, Dict, Any, List
from .general import SysCommand
from .output import log
@@ -51,7 +51,12 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
return new_raw_data
-def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', sort_order=["https", "http"], *args, **kwargs) -> Union[bool, bytes]:
+def filter_mirrors_by_region(regions :str,
+ destination :str = '/etc/pacman.d/mirrorlist',
+ sort_order :List[str] = ["https", "http"],
+ *args :str,
+ **kwargs :str
+) -> Union[bool, bytes]:
"""
This function will change the active mirrors on the live medium by
filtering which regions are active based on `regions`.
@@ -75,7 +80,7 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', so
return new_list.decode('UTF-8')
-def add_custom_mirrors(mirrors: list, *args, **kwargs):
+def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool:
"""
This will append custom mirror definitions in pacman.conf
@@ -91,7 +96,7 @@ def add_custom_mirrors(mirrors: list, *args, **kwargs):
return True
-def insert_mirrors(mirrors, *args, **kwargs):
+def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
It will not flush any other mirrors, just insert new ones.
@@ -138,7 +143,7 @@ def re_rank_mirrors(
return True
-def list_mirrors(sort_order=["https", "http"]):
+def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 0d94572a..6b09deba 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -2,7 +2,7 @@ import logging
import os
import socket
import struct
-from collections import OrderedDict
+from typing import Union, Dict, Any, List
from .exceptions import HardwareIncompatibilityError
from .general import SysCommand
@@ -10,36 +10,40 @@ from .output import log
from .storage import storage
-def get_hw_addr(ifname):
+def get_hw_addr(ifname :str) -> str:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
-def list_interfaces(skip_loopback=True):
- interfaces = OrderedDict()
+def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
+ interfaces = {}
+
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
+
return interfaces
-def check_mirror_reachable():
+def check_mirror_reachable() -> bool:
log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
if SysCommand("pacman -Sy").exit_code == 0:
return True
+
elif os.geteuid() != 0:
log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
return False
-def enrich_iface_types(interfaces: dict):
+def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str, str]:
result = {}
+
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
result[iface] = 'BRIDGE'
@@ -53,19 +57,21 @@ def enrich_iface_types(interfaces: dict):
result[iface] = 'PHYSICAL'
else:
result[iface] = 'UNKNOWN'
+
return result
-def get_interface_from_mac(mac):
+def get_interface_from_mac(mac :str) -> str:
return list_interfaces().get(mac.lower(), None)
-def wireless_scan(interface):
+def wireless_scan(interface :str) -> None:
interfaces = enrich_iface_types(list_interfaces().values())
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
- SysCommand(f"iwctl station {interface} scan")
+ if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0:
+ raise SystemError(f"Could not scan for wireless networks: {output}")
if '_WIFI' not in storage:
storage['_WIFI'] = {}
@@ -76,8 +82,9 @@ def wireless_scan(interface):
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
-def get_wireless_networks(interface):
+def get_wireless_networks(interface :str) -> None:
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
+ # TODO: Rename this to list_wireless_networks() as it doesn't return anything
if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py
index ffc44cbe..1d46ef5e 100644
--- a/archinstall/lib/packages.py
+++ b/archinstall/lib/packages.py
@@ -3,6 +3,7 @@ import ssl
import urllib.error
import urllib.parse
import urllib.request
+from typing import Dict, Any
from .exceptions import RequirementError
@@ -10,7 +11,7 @@ BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
-def find_group(name):
+def find_group(name :str) -> bool:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
@@ -27,7 +28,7 @@ def find_group(name):
return True
-def find_package(name):
+def find_package(name :str) -> Any:
"""
Finds a specific package via the package database.
It makes a simple web-request, which might be a bit slow.
@@ -40,7 +41,7 @@ def find_package(name):
return json.loads(data)
-def find_packages(*names):
+def find_packages(*names :str) -> Dict[str, Any]:
"""
This function returns the search results for many packages.
The function itself is rather slow, so consider not sending to
@@ -49,7 +50,7 @@ def find_packages(*names):
return {package: find_package(package) for package in names}
-def validate_package_list(packages: list):
+def validate_package_list(packages: list) -> bool:
"""
Validates a list of given packages.
Raises `RequirementError` if one or more packages are not found.
diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py
index 027b58d5..e61c114e 100644
--- a/archinstall/lib/plugins.py
+++ b/archinstall/lib/plugins.py
@@ -7,6 +7,7 @@ import pathlib
import urllib.parse
import urllib.request
from importlib import metadata
+from typing import ModuleType, Optional, List
from .output import log
from .storage import storage
@@ -38,7 +39,7 @@ def localize_path(profile_path :str) -> str:
return profile_path
-def import_via_path(path :str, namespace=None): # -> module (not sure how to write that in type definitions)
+def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType:
if not namespace:
namespace = os.path.basename(path)
@@ -62,14 +63,14 @@ def import_via_path(path :str, namespace=None): # -> module (not sure how to wri
except:
pass
-def find_nth(haystack, needle, n):
+def find_nth(haystack :List[str], needle :str, n :int) -> int:
start = haystack.find(needle)
while start >= 0 and n > 1:
start = haystack.find(needle, start + len(needle))
n -= 1
return start
-def load_plugin(path :str): # -> module (not sure how to write that in type definitions)
+def load_plugin(path :str) -> ModuleType:
parsed_url = urllib.parse.urlparse(path)
# The Profile was not a direct match on a remote URL
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 7d5373c5..6b0e69bf 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,3 +1,4 @@
+from __future__ import annotations
import hashlib
import importlib.util
import json
@@ -8,7 +9,10 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
-from typing import Optional
+from typing import Optional, ModuleType, Dict, Union, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .installer import Installer
from .general import multisplit
from .networking import list_interfaces
@@ -16,16 +20,16 @@ from .storage import storage
from .exceptions import ProfileNotFound
-def grab_url_data(path):
+def grab_url_data(path :str) -> str:
safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(safe_path, context=ssl_context)
- return response.read()
+ return response.read() # bytes?
-def is_desktop_profile(profile) -> bool:
+def is_desktop_profile(profile :str) -> bool:
if str(profile) == 'Profile(desktop)':
return True
@@ -42,8 +46,13 @@ def is_desktop_profile(profile) -> bool:
return False
-def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
+def list_profiles(
+ filter_irrelevant_macs :bool = True,
+ subpath :str = '',
+ filter_top_level_profiles :bool = False
+) -> Dict[str, Dict[str, Union[str, bool]]]:
# TODO: Grab from github page as well, not just local static files
+
if filter_irrelevant_macs:
local_macs = list_interfaces()
@@ -101,23 +110,27 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
class Script:
- def __init__(self, profile, installer=None):
- # profile: https://hvornum.se/something.py
- # profile: desktop
- # profile: /path/to/profile.py
+ def __init__(self, profile :str, installer :Optional[Installer] = None):
+ """
+ :param profile: A string representing either a boundled profile, a local python file
+ or a remote path (URL) to a python script-profile. Three examples:
+ * profile: https://archlinux.org/some_profile.py
+ * profile: desktop
+ * profile: /path/to/profile.py
+ """
self.profile = profile
- self.installer = installer
+ self.installer = installer # TODO: Appears not to be used anymore?
self.converted_path = None
self.spec = None
self.examples = None
self.namespace = os.path.splitext(os.path.basename(self.path))[0]
self.original_namespace = self.namespace
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> ModuleType:
self.execute()
return sys.modules[self.namespace]
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
@@ -125,7 +138,7 @@ class Script:
if self.original_namespace:
self.namespace = self.original_namespace
- def localize_path(self, profile_path):
+ def localize_path(self, profile_path :str) -> str:
if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'):
if not self.converted_path:
self.converted_path = f"/tmp/{os.path.basename(self.profile).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py"
@@ -138,7 +151,7 @@ class Script:
return profile_path
@property
- def path(self):
+ def path(self) -> str:
parsed_url = urllib.parse.urlparse(self.profile)
# The Profile was not a direct match on a remote URL
@@ -163,7 +176,7 @@ class Script:
else:
raise ProfileNotFound(f"Cannot handle scheme {parsed_url.scheme}")
- def load_instructions(self, namespace=None):
+ def load_instructions(self, namespace :Optional[str] = None) -> 'Script':
if namespace:
self.namespace = namespace
@@ -173,7 +186,7 @@ class Script:
return self
- def execute(self):
+ def execute(self) -> ModuleType:
if self.namespace not in sys.modules or self.spec is None:
self.load_instructions()
@@ -183,25 +196,23 @@ class Script:
class Profile(Script):
- def __init__(self, installer, path, args=None):
+ def __init__(self, installer :Installer, path :str):
super(Profile, self).__init__(path, installer)
- if args is None:
- args = {}
- def __dump__(self, *args, **kwargs):
+ def __dump__(self, *args :str, **kwargs :str) -> Dict[str, str]:
return {'path': self.path}
- def __repr__(self, *args, **kwargs):
+ def __repr__(self, *args :str, **kwargs :str) -> str:
return f'Profile({os.path.basename(self.profile)})'
- def install(self):
+ def install(self) -> ModuleType:
# Before installing, revert any temporary changes to the namespace.
# This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)
self.namespace = self.original_namespace
return self.execute()
- def has_prep_function(self):
+ def has_prep_function(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@@ -218,7 +229,7 @@ class Profile(Script):
return True
return False
- def has_post_install(self):
+ def has_post_install(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@@ -234,7 +245,7 @@ class Profile(Script):
if hasattr(imported, '_post_install'):
return True
- def is_top_level_profile(self):
+ def is_top_level_profile(self) -> bool:
with open(self.path, 'r') as source:
source_data = source.read()
@@ -247,7 +258,7 @@ class Profile(Script):
# since developers like less code - omitting it should assume they want to present it.
return True
- def get_profile_description(self):
+ def get_profile_description(self) -> str:
with open(self.path, 'r') as source:
source_data = source.read()
@@ -282,11 +293,11 @@ class Profile(Script):
class Application(Profile):
- def __repr__(self, *args, **kwargs):
+ def __repr__(self, *args :str, **kwargs :str):
return f'Application({os.path.basename(self.profile)})'
@property
- def path(self):
+ def path(self) -> str:
parsed_url = urllib.parse.urlparse(self.profile)
# The Profile was not a direct match on a remote URL
@@ -311,7 +322,7 @@ class Application(Profile):
else:
raise ProfileNotFound(f"Application cannot handle scheme {parsed_url.scheme}")
- def install(self):
+ def install(self) -> ModuleType:
# Before installing, revert any temporary changes to the namespace.
# This ensures that the namespace during installation is the original initiation namespace.
# (For instance awesome instead of aweosme.py or app-awesome.py)
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
index d295bdbb..b177052b 100644
--- a/archinstall/lib/services.py
+++ b/archinstall/lib/services.py
@@ -2,7 +2,7 @@ import os
from .general import SysCommand
-def service_state(service_name: str):
+def service_state(service_name: str) -> str:
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index c3beafc0..74229fae 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -1,5 +1,6 @@
import logging
import time
+from typing import Interator
from .exceptions import SysCallError
from .general import SysCommand, SysCommandWorker, locate_binary
from .installer import Installer
@@ -8,14 +9,14 @@ from .storage import storage
class Ini:
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args :str, **kwargs :str):
"""
Limited INI handler for now.
Supports multiple keywords through dictionary list items.
"""
self.kwargs = kwargs
- def __str__(self):
+ def __str__(self) -> str:
result = ''
first_row_done = False
for top_level in self.kwargs:
@@ -54,7 +55,7 @@ class Boot:
self.session = None
self.ready = False
- def __enter__(self):
+ def __enter__(self) -> 'Boot':
if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance:
raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.")
@@ -81,7 +82,7 @@ class Boot:
storage['active_boot'] = self
return self
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> None:
# b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
@@ -98,24 +99,24 @@ class Boot:
else:
raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code)
- def __iter__(self):
+ def __iter__(self) -> Interator[str]:
if self.session:
for value in self.session:
yield value
- def __contains__(self, key: bytes):
+ def __contains__(self, key: bytes) -> bool:
if self.session is None:
return False
return key in self.session
- def is_alive(self):
+ def is_alive(self) -> bool:
if self.session is None:
return False
return self.session.is_alive()
- def SysCommand(self, cmd: list, *args, **kwargs):
+ def SysCommand(self, cmd: list, *args, **kwargs) -> SysCommand:
if cmd[0][0] != '/' and cmd[0][:2] != './':
# This check is also done in SysCommand & SysCommandWorker.
# However, that check is done for `machinectl` and not for our chroot command.
@@ -125,7 +126,7 @@ class Boot:
return SysCommand(["systemd-run", f"--machine={self.container_name}", "--pty", *cmd], *args, **kwargs)
- def SysCommandWorker(self, cmd: list, *args, **kwargs):
+ def SysCommandWorker(self, cmd: list, *args, **kwargs) -> SysCommandWorker:
if cmd[0][0] != '/' and cmd[0][:2] != './':
cmd[0] = locate_binary(cmd[0])
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 6298db19..df53ce49 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,3 +1,4 @@
+from __future__ import annotations
import getpass
import ipaddress
import logging
@@ -7,6 +8,11 @@ import shutil
import signal
import sys
import time
+from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING
+
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .disk.partition import Partition
from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks
from .exceptions import RequirementError, UserError, DiskError
@@ -23,20 +29,20 @@ from .mirrors import list_mirrors
# Some return the keys from the options, some the values?
from .. import fs_types
-
-def get_terminal_height():
+# TODO: These can be removed after the move to simple_menu.py
+def get_terminal_height() -> int:
return shutil.get_terminal_size().lines
-def get_terminal_width():
+def get_terminal_width() -> int:
return shutil.get_terminal_size().columns
-def get_longest_option(options):
+def get_longest_option(options :List[Any]) -> int:
return max([len(x) for x in options])
-def check_for_correct_username(username):
+def check_for_correct_username(username :str) -> bool:
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
log(
@@ -47,14 +53,14 @@ def check_for_correct_username(username):
return False
-def do_countdown():
+def do_countdown() -> bool:
SIG_TRIGGER = False
- def kill_handler(sig, frame):
+ def kill_handler(sig :int, frame :Any) -> None:
print()
exit(0)
- def sig_handler(sig, frame):
+ def sig_handler(sig :int, frame :Any) -> None:
global SIG_TRIGGER
SIG_TRIGGER = True
signal.signal(signal.SIGINT, kill_handler)
@@ -79,12 +85,14 @@ def do_countdown():
sys.stdin.read()
SIG_TRIGGER = False
signal.signal(signal.SIGINT, sig_handler)
+
print()
signal.signal(signal.SIGINT, original_sigint_handler)
+
return True
-def get_password(prompt="Enter a password: "):
+def get_password(prompt :str = "Enter a password: ") -> Optional[str]:
while passwd := getpass.getpass(prompt):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
if passwd != passwd_verification:
@@ -98,7 +106,7 @@ def get_password(prompt="Enter a password: "):
return None
-def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
+def print_large_list(options :List[str], padding :int = 5, margin_bottom :int = 0, separator :str = ': ') -> List[int]:
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
spaces_without_option = longest_line - (len(separator) + highest_index_number_length)
@@ -136,6 +144,7 @@ def select_encrypted_partitions(block_devices :dict, password :str) -> dict:
# Users might want to single out a partition for non-encryption to share between dualboot etc.
+# TODO: This can be removed once we have simple_menu everywhere
class MiniCurses:
def __init__(self, width, height):
self.width = width
@@ -255,11 +264,11 @@ class MiniCurses:
return response
-def ask_for_swap(prompt='Would you like to use swap on zram? (Y/n): ', forced=False):
+def ask_for_swap(prompt :str = 'Would you like to use swap on zram? (Y/n): ', forced :bool = False) -> bool:
return True if input(prompt).strip(' ').lower() not in ('n', 'no') else False
-def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
+def ask_for_superuser_account(prompt :str = 'Username for required superuser with sudo privileges: ', forced :bool = False) -> Dict[str, Dict[str, str]]:
while 1:
new_user = input(prompt).strip(' ')
@@ -277,7 +286,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo
return {new_user: {"!password": password}}
-def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
+def ask_for_additional_users(prompt :str = 'Any additional users to install (leave blank for no users): ') -> List[Dict[str, Dict[str, str]]]:
users = {}
superusers = {}
@@ -297,7 +306,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, superusers
-def ask_for_a_timezone():
+def ask_for_a_timezone() -> str:
timezones = list_timezones()
default = 'UTC'
@@ -311,7 +320,7 @@ def ask_for_a_timezone():
return selected_tz
-def ask_for_bootloader(advanced_options=False) -> str:
+def ask_for_bootloader(advanced_options :bool = False) -> str:
bootloader = "systemd-bootctl" if has_uefi() else "grub-install"
if has_uefi():
if not advanced_options:
@@ -333,14 +342,14 @@ def ask_for_bootloader(advanced_options=False) -> str:
return bootloader
-def ask_for_audio_selection(desktop=True):
+def ask_for_audio_selection(desktop :bool = True) -> str:
audio = 'pipewire' if desktop else 'none'
choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none']
selected_audio = Menu(f'Choose an audio server or leave blank to use "{audio}"', choices, default_option=audio).run()
return selected_audio
-def ask_to_configure_network():
+def ask_to_configure_network() -> Dict[str, Any]:
# Optionally configure one network interface.
# while 1:
# {MAC: Ifname}
@@ -435,7 +444,7 @@ def ask_for_main_filesystem_format(advanced_options=False):
return Menu('Select which filesystem your main partition should use', options, skip=False).run()
-def current_partition_layout(partitions, with_idx=False):
+def current_partition_layout(partitions :List[Partition], with_idx :bool = False) -> Dict[str, Any]:
def do_padding(name, max_len):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
@@ -479,7 +488,7 @@ def current_partition_layout(partitions, with_idx=False):
return f'\n\nCurrent partition layout:\n\n{current_layout}'
-def select_partition(title, partitions, multiple=False):
+def select_partition(title :str, partitions :List[Partition], multiple :bool = False) -> Union[int, List[int], None]:
partition_indexes = list(map(str, range(len(partitions))))
partition = Menu(title, partition_indexes, multi=multiple).run()
@@ -491,47 +500,18 @@ def select_partition(title, partitions, multiple=False):
return None
-def get_default_partition_layout(block_devices, advanced_options=False):
+def get_default_partition_layout(
+ block_devices :Union[BlockDevice, List[BlockDevice]],
+ advanced_options :bool = False
+) -> Dict[str, Any]:
+
if len(block_devices) == 1:
return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options)
else:
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
-def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
- # if has_uefi():
- # partition_type = 'gpt'
- # else:
- # partition_type = 'msdos'
-
- # log(f"Selecting which partitions to re-use on {block_device}...", fg="yellow", level=logging.INFO)
- # partitions = generic_multi_select(block_device.partitions.values(), "Select which partitions to re-use (the rest will be left alone): ", sort=True)
- # partitions_to_wipe = generic_multi_select(partitions, "Which partitions do you wish to wipe (multiple can be selected): ", sort=True)
-
- # mountpoints = {}
- # struct = {
- # "partitions" : []
- # }
- # for partition in partitions:
- # mountpoint = input(f"Select a mountpoint (or skip) for {partition}: ").strip()
-
- # part_struct = {}
- # if mountpoint:
- # part_struct['mountpoint'] = mountpoint
- # if mountpoint == '/boot':
- # part_struct['boot'] = True
- # if has_uefi():
- # part_struct['ESP'] = True
- # elif mountpoint == '/' and
- # if partition.uuid:
- # part_struct['PARTUUID'] = partition.uuid
- # if partition in partitions_to_wipe:
- # part_struct['wipe'] = True
-
- # struct['partitions'].append(part_struct)
-
- # return struct
-
+def manage_new_and_existing_partitions(block_device :BlockDevice) -> Dict[str, Any]:
block_device_struct = {
"partitions": [partition.__dump__() for partition in block_device.partitions.values()]
}
@@ -689,7 +669,7 @@ def manage_new_and_existing_partitions(block_device :BlockDevice) -> dict:
return block_device_struct
-def select_individual_blockdevice_usage(block_devices: list):
+def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]:
result = {}
for device in block_devices:
@@ -700,7 +680,7 @@ def select_individual_blockdevice_usage(block_devices: list):
return result
-def select_disk_layout(block_devices :list, advanced_options=False):
+def select_disk_layout(block_devices :list, advanced_options=False) -> Dict[str, Any]:
modes = [
"Wipe all selected drives and use a best-effort default partition layout",
"Select what to do with each individual drive (followed by partition usage)"
@@ -714,7 +694,7 @@ def select_disk_layout(block_devices :list, advanced_options=False):
return select_individual_blockdevice_usage(block_devices)
-def select_disk(dict_o_disks):
+def select_disk(dict_o_disks :Dict[str, BlockDevice]) -> BlockDevice:
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
Usually this is combined with :ref:`archinstall.list_drives`.
@@ -742,7 +722,7 @@ def select_disk(dict_o_disks):
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
-def select_profile():
+def select_profile() -> Optional[str]:
"""
# Asks the user to select a profile from the available profiles.
#
@@ -770,7 +750,7 @@ def select_profile():
return None
-def select_language():
+def select_language() -> str:
"""
Asks the user to select a language
Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@@ -788,7 +768,7 @@ def select_language():
return selected_lang
-def select_mirror_regions():
+def select_mirror_regions() -> Dict[str, Any]:
"""
Asks the user to select a mirror or region
Usually this is combined with :ref:`archinstall.list_mirrors`.
@@ -810,7 +790,7 @@ def select_mirror_regions():
return {}
-def select_harddrives():
+def select_harddrives() -> Optional[str]:
"""
Asks the user to select one or multiple hard drives
@@ -832,7 +812,7 @@ def select_harddrives():
return None
-def select_driver(options=AVAILABLE_GFX_DRIVERS):
+def select_driver(options :Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str:
"""
Some what convoluted function, whose job is simple.
Select a graphics driver from a pre-defined set of popular options.
@@ -866,7 +846,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
-def select_kernel():
+def select_kernel() -> List[str]:
"""
Asks the user to select a kernel for system.