Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-01-06 22:01:15 +0100
committerGitHub <noreply@github.com>2022-01-06 22:01:15 +0100
commite32cf71ae7dacbf9674262705cb2e8e1a5a2d206 (patch)
treed33e6202d10813221a9935dcdc49145f6ffca83e /archinstall/lib/installer.py
parent015cd2a59fbdf3316ddfb7546b884157ad00c7fe (diff)
Added type annotations to all functions (#845)
* Added type annotations for 1/5 of the files. There's bound to be some issues with type miss-match, will sort that out later. * Added type hints for 4/5 of the code * Added type hints for 4.7/5 of the code * Added type hints for 5/5 of the code base * Split the linters into individual files This should help with more clearly show which runner is breaking since they don't share a single common name any longer. Also moved mypy settings into pyproject.toml * Fixed some of the last flake8 issues * Missing parameter * Fixed invalid lookahead types * __future__ had to be at the top * Fixed last flake8 issues
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py139
1 files changed, 86 insertions, 53 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index c02d5717..4f46e458 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,5 +1,4 @@
import time
-from typing import Union
import logging
import os
import shutil
@@ -7,6 +6,7 @@ import shlex
import pathlib
import subprocess
import glob
+from typing import Union, Dict, Any, List, ModuleType, Optional, Iterator, Mapping
from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
@@ -30,29 +30,29 @@ __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
class InstallationFile:
- def __init__(self, installation, filename, owner, mode="w"):
+ def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
self.installation = installation
self.filename = filename
self.owner = owner
self.mode = mode
self.fh = None
- def __enter__(self):
+ def __enter__(self) -> 'InstallationFile':
self.fh = open(self.filename, self.mode)
return self
- def __exit__(self, *args):
+ def __exit__(self, *args :str) -> None:
self.fh.close()
self.installation.chown(self.owner, self.filename)
- def write(self, data: Union[str, bytes]):
+ def write(self, data: Union[str, bytes]) -> int:
return self.fh.write(data)
- def read(self, *args):
+ def read(self, *args) -> Union[str, bytes]:
return self.fh.read(*args)
- def poll(self, *args):
- return self.fh.poll(*args)
+# def poll(self, *args) -> bool:
+# return self.fh.poll(*args)
def accessibility_tools_in_use() -> bool:
@@ -84,11 +84,12 @@ class Installer:
"""
- def __init__(self, target, *, base_packages=None, kernels=None):
+ def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None):
if base_packages is None:
base_packages = __packages__[:3]
if kernels is None:
kernels = ['linux']
+
self.kernels = kernels
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
@@ -119,18 +120,17 @@ class Installer:
self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
self.KERNEL_PARAMS = []
- def log(self, *args, level=logging.DEBUG, **kwargs):
+ def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
"""
log(*args, level=level, **kwargs)
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Installer':
return self
- def __exit__(self, *args, **kwargs):
- # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ def __exit__(self, *args :str, **kwargs :str) -> None:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
@@ -163,10 +163,10 @@ class Installer:
return False
@property
- def partitions(self):
+ def partitions(self) -> List[Partition]:
return get_partitions_in_use(self.target)
- def sync_log_to_install_medium(self):
+ def sync_log_to_install_medium(self) -> bool:
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True:
@@ -180,7 +180,7 @@ class Installer:
return True
- def mount_ordered_layout(self, layouts: dict):
+ def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
from .luks import luks2
mountpoints = {}
@@ -254,16 +254,16 @@ class Installer:
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
- def mount(self, partition, mountpoint, create_mountpoint=True):
+ def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None:
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
partition.mount(f'{self.target}{mountpoint}')
- def post_install_check(self, *args, **kwargs):
+ def post_install_check(self, *args :str, **kwargs :str) -> List[bool]:
return [step for step, flag in self.helper_flags.items() if flag is False]
- def pacstrap(self, *packages, **kwargs):
+ def pacstrap(self, *packages :str, **kwargs :str) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@@ -284,7 +284,7 @@ class Installer:
else:
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
- def set_mirrors(self, mirrors):
+ def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors):
@@ -292,7 +292,7 @@ class Installer:
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
- def genfstab(self, flags='-pU'):
+ def genfstab(self, flags :str = '-pU') -> bool:
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
@@ -307,11 +307,11 @@ class Installer:
return True
- def set_hostname(self, hostname: str, *args, **kwargs):
+ def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None:
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
- def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
+ def set_locale(self, locale :str, encoding :str = 'UTF-8', *args :str, **kwargs :str) -> bool:
if not len(locale):
return True
@@ -322,7 +322,7 @@ class Installer:
return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
- def set_timezone(self, zone, *args, **kwargs):
+ def set_timezone(self, zone :str, *args :str, **kwargs :str) -> bool:
if not zone:
return True
if not len(zone):
@@ -337,6 +337,7 @@ class Installer:
(pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
+
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
@@ -344,11 +345,13 @@ class Installer:
fg='red'
)
- def activate_ntp(self):
+ return False
+
+ def activate_ntp(self) -> None:
log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO)
self.activate_time_syncronization()
- def activate_time_syncronization(self):
+ def activate_time_syncronization(self) -> None:
self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO)
self.enable_service('systemd-timesyncd')
@@ -361,11 +364,11 @@ class Installer:
with Boot(self) as session:
session.SysCommand(["timedatectl", "set-ntp", 'true'])
- def enable_espeakup(self):
+ def enable_espeakup(self) -> None:
self.log('Enabling espeakup.service for speech synthesis (accessibility).', level=logging.INFO)
self.enable_service('espeakup')
- def enable_service(self, *services):
+ def enable_service(self, *services :str) -> None:
for service in services:
self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
@@ -375,19 +378,27 @@ class Installer:
if hasattr(plugin, 'on_service'):
plugin.on_service(service)
- def run_command(self, cmd, *args, **kwargs):
+ def run_command(self, cmd :str, *args :str, **kwargs :str) -> None:
return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
- def arch_chroot(self, cmd, run_as=None):
+ def arch_chroot(self, cmd :str, run_as :Optional[str] = None):
if run_as:
cmd = f"su - {run_as} -c {shlex.quote(cmd)}"
return self.run_command(cmd)
- def drop_to_shell(self):
+ def drop_to_shell(self) -> None:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
- def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
+ def configure_nic(self,
+ nic :str,
+ dhcp :bool = True,
+ ip :Optional[str] = None,
+ gateway :Optional[str] = None,
+ dns :Optional[str] = None,
+ *args :str,
+ **kwargs :str
+ ) -> None:
from .systemd import Networkd
if dhcp:
@@ -412,7 +423,7 @@ class Installer:
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
- def copy_iso_network_config(self, enable_services=False):
+ def copy_iso_network_config(self, enable_services :bool = False) -> bool:
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if psk_files := glob.glob('/var/lib/iwd/*.psk'):
@@ -427,7 +438,7 @@ class Installer:
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
- def post_install_enable_iwd_service(*args, **kwargs):
+ def post_install_enable_iwd_service(*args :str, **kwargs :str):
self.enable_service('iwd')
self.post_base_install.append(post_install_enable_iwd_service)
@@ -452,7 +463,7 @@ class Installer:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
- def post_install_enable_networkd_resolved(*args, **kwargs):
+ def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
@@ -462,7 +473,7 @@ class Installer:
return True
- def detect_encryption(self, partition):
+ def detect_encryption(self, partition :Partition) -> bool:
part = Partition(partition.parent, None, autodetect_filesystem=True)
if partition.encrypted:
return partition
@@ -471,7 +482,7 @@ class Installer:
return False
- def mkinitcpio(self, *flags):
+ def mkinitcpio(self, *flags :str) -> bool:
for plugin in plugins.values():
if hasattr(plugin, 'on_mkinitcpio'):
# Allow plugins to override the usage of mkinitcpio altogether.
@@ -483,9 +494,10 @@ class Installer:
mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
- SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
- def minimal_installation(self):
+ return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0
+
+ def minimal_installation(self) -> bool:
# Add necessary packages if encrypting the drive
# (encrypted partitions default to btrfs for now, so we need btrfs-progs)
# TODO: Perhaps this should be living in the function which dictates
@@ -562,7 +574,7 @@ class Installer:
return True
- def setup_swap(self, kind='zram'):
+ def setup_swap(self, kind :str = 'zram') -> bool:
if kind == 'zram':
self.log(f"Setting up swap on zram")
self.pacstrap('zram-generator')
@@ -578,7 +590,18 @@ class Installer:
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
- def add_bootloader(self, bootloader='systemd-bootctl'):
+ def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
+ """
+ Adds a bootloader to the installation instance.
+ Archinstall supports one of three types:
+ * systemd-bootctl
+ * grub
+ * efistub (beta)
+
+ :param bootloader: Can be one of the three strings
+ 'systemd-bootctl', 'grub' or 'efistub' (beta)
+ """
+
for plugin in plugins.values():
if hasattr(plugin, 'on_add_bootloader'):
# Allow plugins to override the boot-loader handling.
@@ -757,10 +780,19 @@ class Installer:
return True
- def add_additional_packages(self, *packages):
+ def add_additional_packages(self, *packages :str) -> bool:
return self.pacstrap(*packages)
- def install_profile(self, profile):
+ def install_profile(self, profile :str) -> ModuleType:
+ """
+ Installs a archinstall profile script (.py file).
+ This profile can be either local, remote or part of the library.
+
+ :param profile: Can be a local path or a remote path (URL)
+ :return: Returns the imported script as a module, this way
+ you can access any remaining functions exposed by the profile.
+ :rtype: module
+ """
storage['installation_session'] = self
if type(profile) == str:
@@ -769,13 +801,13 @@ class Installer:
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
- def enable_sudo(self, entity: str, group=False):
+ def enable_sudo(self, entity: str, group :bool = False) -> bool:
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
- def user_create(self, user: str, password=None, groups=None, sudo=False):
+ def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None:
if groups is None:
groups = []
@@ -789,7 +821,8 @@ class Installer:
if not handled_by_plugin:
self.log(f'Creating user {user}', level=logging.INFO)
- SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')
+ if not (output := SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}')).exit_code == 0:
+ raise SystemError(f"Could not create user inside installation: {output}")
for plugin in plugins.values():
if hasattr(plugin, 'on_user_created'):
@@ -806,24 +839,24 @@ class Installer:
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
- def user_set_pw(self, user, password):
+ def user_set_pw(self, user :str, password :str) -> bool:
self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\"").exit_code == 0
- def user_set_shell(self, user, shell):
+ def user_set_shell(self, user :str, shell :str) -> bool:
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"")
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0
- def chown(self, owner, path, options=[]):
- return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}")
+ def chown(self, owner :str, path :str, options :List[str] = []) -> bool:
+ return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0
- def create_file(self, filename, owner=None):
+ def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
return InstallationFile(self, filename, owner)
def set_keyboard_language(self, language: str) -> bool: