Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py67
-rw-r--r--archinstall/lib/exceptions.py2
-rw-r--r--archinstall/lib/general.py10
-rw-r--r--archinstall/lib/installer.py243
-rw-r--r--archinstall/lib/locale_helpers.py14
-rw-r--r--archinstall/lib/luks.py4
-rw-r--r--archinstall/lib/mirrors.py9
-rw-r--r--archinstall/lib/services.py2
-rw-r--r--archinstall/lib/user_interaction.py61
9 files changed, 243 insertions, 169 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index dbb69662..bada4076 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -126,6 +126,18 @@ class BlockDevice():
def partition_table_type(self):
return GPT
+ @property
+ def uuid(self):
+ log(f'BlockDevice().uuid is untested!', level=LOG_LEVELS.Warning, fg='yellow')
+ """
+ Returns the disk UUID as returned by lsblk.
+ This is more reliable than relying on /dev/disk/by-partuuid as
+ it doesn't seam to be able to detect md raid partitions.
+ """
+ lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}'))
+ for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
+ return partition.get('uuid', None)
+
def has_partitions(self):
return len(self.partitions)
@@ -166,7 +178,7 @@ class Partition():
self.mountpoint = target
if not self.filesystem and autodetect_filesystem:
- if (fstype := mount_information.get('fstype', get_filesystem_type(self.real_device))):
+ if (fstype := mount_information.get('fstype', get_filesystem_type(path))):
self.filesystem = fstype
if self.filesystem == 'crypto_LUKS':
@@ -187,9 +199,9 @@ class Partition():
mount_repr = f", rel_mountpoint={self.target_mountpoint}"
if self._encrypted:
- return f'Partition(path={self.path}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, real_device={self.real_device}, fs={self.filesystem}{mount_repr})'
else:
- return f'Partition(path={self.path}, fs={self.filesystem}{mount_repr})'
+ return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
@property
def uuid(self) -> str:
@@ -215,13 +227,14 @@ class Partition():
self._encrypted = value
@property
+ def parent(self):
+ return self.real_device
+
+ @property
def real_device(self):
- if not self._encrypted:
- return self.path
- else:
- for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
- if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
- return f"/dev/{parent}"
+ for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
+ if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
+ return f"/dev/{parent}"
# raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
@@ -366,14 +379,16 @@ class Partition():
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
- ## libc has some issues with loop devices, defaulting back to sys calls
- # ret = libc.mount(self.path.encode(), target.encode(), fs.encode(), 0, options.encode())
- # if ret < 0:
- # errno = ctypes.get_errno()
- # raise OSError(errno, f"Error mounting {self.path} ({fs}) on {target} with options '{options}': {os.strerror(errno)}")
- if sys_command(f'/usr/bin/mount {self.path} {target}').exit_code == 0:
- self.mountpoint = target
- return True
+
+ pathlib.Path(target).mkdir(parents=True, exist_ok=True)
+
+ try:
+ sys_command(f'/usr/bin/mount {self.path} {target}')
+ except SysCallError as err:
+ raise err
+
+ self.mountpoint = target
+ return True
def unmount(self):
try:
@@ -572,6 +587,24 @@ def get_mount_info(path):
return output['filesystems'][0]
+def get_partitions_in_use(mountpoint):
+ try:
+ output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}'))
+ except SysCallError:
+ return {}
+
+ mounts = []
+
+ output = output.decode('UTF-8')
+ output = json.loads(output)
+ for target in output.get('filesystems', []):
+ mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']))
+
+ for child in target.get('children', []):
+ mounts.append(Partition(child['source'], None, filesystem=child.get('fstype', None), mountpoint=child['target']))
+
+ return mounts
+
def get_filesystem_type(path):
try:
handle = sys_command(f"blkid -o value -s TYPE {path}")
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index a320eef6..49913980 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -18,4 +18,6 @@ class HardwareIncompatibilityError(BaseException):
class PermissionError(BaseException):
pass
class UserError(BaseException):
+ pass
+class ServiceException(BaseException):
pass \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index f2a714e7..6e3b66f1 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -76,7 +76,7 @@ class sys_command():#Thread):
"""
Stolen from archinstall_gui
"""
- def __init__(self, cmd, callback=None, start_callback=None, *args, **kwargs):
+ def __init__(self, cmd, callback=None, start_callback=None, environment_vars={}, *args, **kwargs):
kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False)
@@ -93,6 +93,7 @@ class sys_command():#Thread):
raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
self.args = args
self.kwargs = kwargs
+ self.environment_vars = environment_vars
self.kwargs.setdefault("worker", None)
self.callback = callback
@@ -159,7 +160,7 @@ class sys_command():#Thread):
# Replace child process with our main process
if not self.kwargs['emulate']:
try:
- os.execv(self.cmd[0], self.cmd)
+ os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
except FileNotFoundError:
self.status = 'done'
self.log(f"{self.cmd[0]} does not exist.", level=LOG_LEVELS.Debug)
@@ -262,6 +263,11 @@ class sys_command():#Thread):
with open(f'{self.cwd}/trace.log', 'wb') as fh:
fh.write(self.trace_log)
+ try:
+ os.close(child_fd)
+ except:
+ pass
+
def prerequisite_check():
if not os.path.isdir("/sys/firmware/efi"):
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index b7fe5e9d..758033a7 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,4 +1,4 @@
-import os, stat, time, shutil, pathlib
+import os, stat, time, shutil, pathlib, subprocess
from .exceptions import *
from .disk import *
@@ -34,30 +34,21 @@ class Installer():
:type hostname: str, optional
"""
- def __init__(self, partition, boot_partition, *, base_packages='base base-devel linux linux-firmware efibootmgr', profile=None, mountpoint='/mnt', hostname='ArchInstalled', logdir=None, logfile=None):
- self.profile = profile
- self.hostname = hostname
- self.mountpoint = mountpoint
+ def __init__(self, target, *, base_packages='base base-devel linux linux-firmware efibootmgr'):
+ self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
- if logdir:
- storage['LOG_PATH'] = logdir
- if logfile:
- storage['LOG_FILE'] = logfile
-
self.helper_flags = {
- 'bootloader' : False,
'base' : False,
- 'user' : False # Root counts as a user, if additional users are skipped.
+ 'bootloader' : False
}
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
self.post_base_install = []
- storage['session'] = self
- self.partition = partition
- self.boot_partition = boot_partition
+ storage['session'] = self
+ self.partitions = get_partitions_in_use(self.target)
def log(self, *args, level=LOG_LEVELS.Debug, **kwargs):
"""
@@ -67,9 +58,6 @@ class Installer():
log(*args, level=level, **kwargs)
def __enter__(self, *args, **kwargs):
- self.partition.mount(self.mountpoint)
- os.makedirs(f'{self.mountpoint}/boot', exist_ok=True)
- self.boot_partition.mount(f'{self.mountpoint}/boot')
return self
def __exit__(self, *args, **kwargs):
@@ -93,6 +81,7 @@ class Installer():
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', bg='black', fg='green', level=LOG_LEVELS.Info)
self.sync_log_to_install_medium()
+
return True
else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', bg='black', fg='red', level=LOG_LEVELS.Warning)
@@ -112,18 +101,18 @@ class Installer():
if (filename := storage.get('LOG_FILE', None)):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
- if not os.path.isdir(f"{self.mountpoint}/{os.path.dirname(absolute_logfile)}"):
- os.makedirs(f"{self.mountpoint}/{os.path.dirname(absolute_logfile)}")
+ if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
+ os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
- shutil.copy2(absolute_logfile, f"{self.mountpoint}/{absolute_logfile}")
+ shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
return True
def mount(self, partition, mountpoint, create_mountpoint=True):
- if create_mountpoint and not os.path.isdir(f'{self.mountpoint}{mountpoint}'):
- os.makedirs(f'{self.mountpoint}{mountpoint}')
+ if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
+ os.makedirs(f'{self.target}{mountpoint}')
- partition.mount(f'{self.mountpoint}{mountpoint}')
+ partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
return [step for step, flag in self.helper_flags.items() if flag is False]
@@ -133,7 +122,7 @@ class Installer():
self.log(f'Installing packages: {packages}', level=LOG_LEVELS.Info)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
- if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.mountpoint} {" ".join(packages)}', **kwargs)).exit_code == 0:
+ if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
return True
else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=LOG_LEVELS.Info)
@@ -141,42 +130,41 @@ class Installer():
self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=LOG_LEVELS.Info)
def set_mirrors(self, mirrors):
- return use_mirrors(mirrors, destination=f'{self.mountpoint}/etc/pacman.d/mirrorlist')
+ return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
def genfstab(self, flags='-pU'):
- self.log(f"Updating {self.mountpoint}/etc/fstab", level=LOG_LEVELS.Info)
+ self.log(f"Updating {self.target}/etc/fstab", level=LOG_LEVELS.Info)
- fstab = sys_command(f'/usr/bin/genfstab {flags} {self.mountpoint}').trace_log
- with open(f"{self.mountpoint}/etc/fstab", 'ab') as fstab_fh:
+ fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
+ with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
fstab_fh.write(fstab)
- if not os.path.isfile(f'{self.mountpoint}/etc/fstab'):
+ if not os.path.isfile(f'{self.target}/etc/fstab'):
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}')
return True
- def set_hostname(self, hostname=None, *args, **kwargs):
- if not hostname: hostname = self.hostname
- with open(f'{self.mountpoint}/etc/hostname', 'w') as fh:
- fh.write(self.hostname + '\n')
+ def set_hostname(self, hostname :str, *args, **kwargs):
+ with open(f'{self.target}/etc/hostname', 'w') as fh:
+ fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
if not len(locale): return True
- with open(f'{self.mountpoint}/etc/locale.gen', 'a') as fh:
+ with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{locale}.{encoding} {encoding}\n')
- with open(f'{self.mountpoint}/etc/locale.conf', 'w') as fh:
+ with open(f'{self.target}/etc/locale.conf', 'w') as fh:
fh.write(f'LANG={locale}.{encoding}\n')
- return True if sys_command(f'/usr/bin/arch-chroot {self.mountpoint} locale-gen').exit_code == 0 else False
+ return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs):
if not zone: return True
if not len(zone): return True # Redundant
if (pathlib.Path("/usr")/"share"/"zoneinfo"/zone).exists():
- (pathlib.Path(self.mountpoint)/"etc"/"localtime").unlink(missing_ok=True)
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
+ (pathlib.Path(self.target)/"etc"/"localtime").unlink(missing_ok=True)
+ sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
else:
self.log(
@@ -191,16 +179,21 @@ class Installer():
if self.enable_service('ntpd'):
return True
- def enable_service(self, service):
- self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
- return self.arch_chroot(f'systemctl enable {service}').exit_code == 0
+ def enable_service(self, *services):
+ for service in services:
+ self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
+ if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
+ raise ServiceException(f"Unable to start service {service}: {output}")
def run_command(self, cmd, *args, **kwargs):
- return sys_command(f'/usr/bin/arch-chroot {self.mountpoint} {cmd}')
+ return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs):
return self.run_command(cmd)
+ def drop_to_shell(self):
+ subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
+
def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
if dhcp:
conf = Networkd(Match={"Name": nic}, Network={"DHCP": "yes"})
@@ -216,15 +209,15 @@ class Installer():
conf = Networkd(Match={"Name": nic}, Network=network)
- with open(f"{self.mountpoint}/etc/systemd/network/10-{nic}.network", "a") as netconf:
+ with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
def copy_ISO_network_config(self, enable_services=False):
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
if (psk_files := glob.glob('/var/lib/iwd/*.psk')):
- if not os.path.isdir(f"{self.mountpoint}/var/lib/iwd"):
- os.makedirs(f"{self.mountpoint}/var/lib/iwd")
+ if not os.path.isdir(f"{self.target}/var/lib/iwd"):
+ os.makedirs(f"{self.target}/var/lib/iwd")
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
@@ -244,79 +237,92 @@ class Installer():
self.enable_service('iwd')
for psk in psk_files:
- shutil.copy2(psk, f"{self.mountpoint}/var/lib/iwd/{os.path.basename(psk)}")
+ shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}")
# Copy (if any) systemd-networkd config files
if (netconfigurations := glob.glob('/etc/systemd/network/*')):
- if not os.path.isdir(f"{self.mountpoint}/etc/systemd/network/"):
- os.makedirs(f"{self.mountpoint}/etc/systemd/network/")
+ if not os.path.isdir(f"{self.target}/etc/systemd/network/"):
+ os.makedirs(f"{self.target}/etc/systemd/network/")
for netconf_file in netconfigurations:
- shutil.copy2(netconf_file, f"{self.mountpoint}/etc/systemd/network/{os.path.basename(netconf_file)}")
+ shutil.copy2(netconf_file, f"{self.target}/etc/systemd/network/{os.path.basename(netconf_file)}")
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args, **kwargs):
- self.enable_service('systemd-networkd')
- self.enable_service('systemd-resolved')
-
+ self.enable_service('systemd-networkd', 'systemd-resolved')
self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services
else:
- self.enable_service('systemd-networkd')
- self.enable_service('systemd-resolved')
+ self.enable_service('systemd-networkd', 'systemd-resolved')
+
return True
+ def detect_encryption(self, partition):
+ if partition.encrypted:
+ return partition
+ elif partition.parent not in partition.path and Partition(partition.parent, None, autodetect_filesystem=True).filesystem == 'crypto_LUKS':
+ return Partition(partition.parent, None, autodetect_filesystem=True)
+
+ return False
+
def minimal_installation(self):
## Add necessary packages if encrypting the drive
## (encrypted partitions default to btrfs for now, so we need btrfs-progs)
## TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now.
- if self.partition.filesystem == 'btrfs':
- #if self.partition.encrypted:
- self.base_packages.append('btrfs-progs')
- if self.partition.filesystem == 'xfs':
- self.base_packages.append('xfsprogs')
- if self.partition.filesystem == 'f2fs':
- self.base_packages.append('f2fs-tools')
+ MODULES = []
+ BINARIES = []
+ FILES = []
+ HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
+
+ for partition in self.partitions:
+ if partition.filesystem == 'btrfs':
+ #if partition.encrypted:
+ self.base_packages.append('btrfs-progs')
+ if partition.filesystem == 'xfs':
+ self.base_packages.append('xfsprogs')
+ if partition.filesystem == 'f2fs':
+ self.base_packages.append('f2fs-tools')
+
+ # Configure mkinitcpio to handle some specific use cases.
+ if partition.filesystem == 'btrfs':
+ if 'btrfs' not in MODULES:
+ MODULES.append('btrfs')
+ if '/usr/bin/btrfs-progs' not in BINARIES:
+ BINARIES.append('/usr/bin/btrfs')
+
+ if self.detect_encryption(partition):
+ if 'encrypt' not in HOOKS:
+ HOOKS.insert(HOOKS.index('filesystems'), 'encrypt')
+
self.pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
#self.genfstab()
- with open(f"{self.mountpoint}/etc/fstab", "a") as fstab:
+ with open(f"{self.target}/etc/fstab", "a") as fstab:
fstab.write(
"\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
) # Redundant \n at the start? who knows?
## TODO: Support locale and timezone
- #os.remove(f'{self.mountpoint}/etc/localtime')
- #sys_command(f'/usr/bin/arch-chroot {self.mountpoint} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
+ #os.remove(f'{self.target}/etc/localtime')
+ #sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
#sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
- self.set_hostname()
+ self.set_hostname('archinstall')
self.set_locale('en_US')
# TODO: Use python functions for this
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} chmod 700 /root')
-
- # Configure mkinitcpio to handle some specific use cases.
- # TODO: Yes, we should not overwrite the entire thing, but for now this should be fine
- # since we just installed the base system.
- if self.partition.filesystem == 'btrfs':
- with open(f'{self.mountpoint}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write('MODULES=(btrfs)\n')
- mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect keyboard keymap modconf block encrypt filesystems fsck)\n')
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} mkinitcpio -p linux')
- elif self.partition.encrypted:
- with open(f'{self.mountpoint}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write('MODULES=()\n')
- mkinit.write('BINARIES=()\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect keyboard keymap modconf block encrypt filesystems fsck)\n')
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} mkinitcpio -p linux')
+ sys_command(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
+
+ with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
+ mkinit.write(f"MODULES=({' '.join(MODULES)})\n")
+ mkinit.write(f"BINARIES=({' '.join(BINARIES)})\n")
+ mkinit.write(f"FILES=({' '.join(FILES)})\n")
+ mkinit.write(f"HOOKS=({' '.join(HOOKS)})\n")
+ sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio -p linux')
self.helper_flags['base'] = True
@@ -328,7 +334,15 @@ class Installer():
return True
def add_bootloader(self, bootloader='systemd-bootctl'):
- self.log(f'Adding bootloader {bootloader} to {self.boot_partition}', level=LOG_LEVELS.Info)
+ boot_partition = None
+ root_partition = None
+ for partition in self.partitions:
+ if partition.mountpoint == self.target+'/boot':
+ boot_partition = partition
+ elif partition.mountpoint == self.target:
+ root_partition = partition
+
+ self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=LOG_LEVELS.Info)
if bootloader == 'systemd-bootctl':
# TODO: Ideally we would want to check if another config
@@ -336,11 +350,11 @@ class Installer():
# And in which case we should do some clean up.
# Install the boot loader
- sys_command(f'/usr/bin/arch-chroot {self.mountpoint} bootctl --no-variables --path=/boot install')
+ sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
# Modify or create a loader.conf
- if os.path.isfile(f'{self.mountpoint}/boot/loader/loader.conf'):
- with open(f'{self.mountpoint}/boot/loader/loader.conf', 'r') as loader:
+ if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
+ with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader:
loader_data = loader.read().split('\n')
else:
loader_data = [
@@ -348,7 +362,7 @@ class Installer():
f"timeout 5"
]
- with open(f'{self.mountpoint}/boot/loader/loader.conf', 'w') as loader:
+ with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
for line in loader_data:
if line[:8] == 'default ':
loader.write(f'default {self.init_time}\n')
@@ -360,7 +374,7 @@ class Installer():
#UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
# Setup the loader entry
- with open(f'{self.mountpoint}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
+ with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
entry.write(f'# Created by: archinstall\n')
entry.write(f'# Created on: {self.init_time}\n')
entry.write(f'title Arch Linux\n')
@@ -370,28 +384,19 @@ class Installer():
## so we'll use the old manual method until we get that sorted out.
- if self.partition.encrypted:
- log(f"Identifying root partition by DISK-UUID on {self.partition}, looking for '{os.path.basename(self.partition.real_device)}'.", level=LOG_LEVELS.Debug)
- for root, folders, uids in os.walk('/dev/disk/by-uuid'):
- for uid in uids:
- real_path = os.path.realpath(os.path.join(root, uid))
-
- log(f"Checking root partition match {os.path.basename(real_path)} against {os.path.basename(self.partition.real_device)}: {os.path.basename(real_path) == os.path.basename(self.partition.real_device)}", level=LOG_LEVELS.Debug)
- if not os.path.basename(real_path) == os.path.basename(self.partition.real_device): continue
-
- entry.write(f'options cryptdevice=UUID={uid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
-
- self.helper_flags['bootloader'] = bootloader
- return True
- break
+ if (real_device := self.detect_encryption(root_partition)):
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=LOG_LEVELS.Debug)
+ entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
else:
- log(f"Identifying root partition by PART-UUID on {self.partition}, looking for '{os.path.basename(self.partition.path)}'.", level=LOG_LEVELS.Debug)
- entry.write(f'options root=PARTUUID={self.partition.uuid} rw intel_pstate=no_hwp\n')
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=LOG_LEVELS.Debug)
+ entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp\n')
- self.helper_flags['bootloader'] = bootloader
- return True
+ self.helper_flags['bootloader'] = bootloader
+ return True
- raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.mountpoint}/boot/loader/entries/arch.conf will be broken until fixed.")
+ raise RequirementError(f"Could not identify the UUID of {root_partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
else:
raise RequirementError(f"Unknown (or not yet implemented) bootloader added to add_bootloader(): {bootloader}")
@@ -416,19 +421,19 @@ class Installer():
def enable_sudo(self, entity :str, group=False):
self.log(f'Enabling sudo permissions for {entity}.', level=LOG_LEVELS.Info)
- with open(f'{self.mountpoint}/etc/sudoers', 'a') as sudoers:
+ with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
def user_create(self, user :str, password=None, groups=[], sudo=False):
self.log(f'Creating user {user}', level=LOG_LEVELS.Info)
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} useradd -m -G wheel {user}'))
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password:
self.user_set_pw(user, password)
if groups:
for group in groups:
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.mountpoint} gpasswd -a {user} {group}'))
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}'))
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
@@ -440,12 +445,20 @@ class Installer():
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.mountpoint} sh -c \"echo '{user}:{password}' | chpasswd\""))
+ o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
+ pass
+
+ def user_set_shell(self, user, shell):
+ self.log(f'Setting shell for {user} to {shell}', level=LOG_LEVELS.Info)
+
+ o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass
def set_keyboard_language(self, language):
if len(language.strip()):
- with open(f'{self.mountpoint}/etc/vconsole.conf', 'w') as vconsole:
+ with open(f'{self.target}/etc/vconsole.conf', 'w') as vconsole:
vconsole.write(f'KEYMAP={language}\n')
vconsole.write(f'FONT=lat9w-16\n')
+ else:
+ self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=LOG_LEVELS.Info)
return True
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index 523a23d5..736bfc47 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -1,29 +1,25 @@
+import subprocess
import os
from .exceptions import *
# from .general import sys_command
-def list_keyboard_languages(layout='qwerty'):
+def list_keyboard_languages():
locale_dir = '/usr/share/kbd/keymaps/'
if not os.path.isdir(locale_dir):
raise RequirementError(f'Directory containing locales does not exist: {locale_dir}')
for root, folders, files in os.walk(locale_dir):
- # Since qwerty is under /i386/ but other layouts are
- # in different spots, we'll need to filter the last foldername
- # of the path to verify against the desired layout.
- if os.path.basename(root) != layout:
- continue
for file in files:
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
-def search_keyboard_layout(filter, layout='qwerty'):
- for language in list_keyboard_languages(layout):
+def search_keyboard_layout(filter):
+ for language in list_keyboard_languages():
if filter.lower() in language.lower():
yield language
def set_keyboard_language(locale):
- return os.system(f'loadkeys {locale}') == 0
+ return subprocess.call(['loadkeys', locale]) == 0
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 561d4d6e..894be1c8 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -70,7 +70,7 @@ class luks2():
'--batch-mode',
'--verbose',
'--type', 'luks2',
- '--pbkdf', 'argon2i',
+ '--pbkdf', 'argon2id',
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
@@ -116,7 +116,7 @@ class luks2():
def unlock(self, partition, mountpoint, key_file):
"""
- Mounts a lukts2 compatible partition to a certain mountpoint.
+ Mounts a luks2 compatible partition to a certain mountpoint.
Keyfile must be specified as there's no way to interact with the pw-prompt atm.
:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index d7d35782..04f47c0d 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -74,10 +74,15 @@ def re_rank_mirrors(top=10, *positionals, **kwargs):
def list_mirrors():
url = f"https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
-
- response = urllib.request.urlopen(url)
regions = {}
+ try:
+ response = urllib.request.urlopen(url)
+ except urllib.error.URLError as err:
+ log(f'Could not fetch an active mirror-list: {err}', level=LOG_LEVELS.Warning, fg="yellow")
+ return regions
+
+
region = 'Unknown region'
for line in response.readlines():
if len(line.strip()) == 0:
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
index 8fcdd296..bb6f64f2 100644
--- a/archinstall/lib/services.py
+++ b/archinstall/lib/services.py
@@ -7,6 +7,6 @@ def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
- state = b''.join(sys_command(f'systemctl show -p SubState --value {service_name}'))
+ state = b''.join(sys_command(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS' : '0'}))
return state.strip().decode('UTF-8')
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 1f5924e4..10df6755 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -130,7 +130,9 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, super_users
def ask_for_a_timezone():
- timezone = input('Enter a valid timezone (Example: Europe/Stockholm): ').strip()
+ timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip()
+ if timezone == '':
+ timezone = 'UTC'
if (pathlib.Path("/usr")/"share"/"zoneinfo"/timezone).exists():
return timezone
else:
@@ -139,15 +141,25 @@ def ask_for_a_timezone():
level=LOG_LEVELS.Warning,
fg='red'
)
+
+def ask_for_audio_selection():
+ audio = "pulseaudio" # Default for most desktop environments
+ pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
+ if pipewire_choice in ("y", ""):
+ audio = "pipewire"
+
+ return audio
def ask_to_configure_network():
# Optionally configure one network interface.
#while 1:
# {MAC: Ifname}
- interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation', **list_interfaces()}
+ interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation','NetworkManager':'Use NetworkManager to control and manage your internet connection', **list_interfaces()}
nic = generic_select(interfaces.values(), "Select one network interface to configure (leave blank to skip): ")
if nic and nic != 'Copy ISO network configuration to installation':
+ if nic == 'Use NetworkManager to control and manage your internet connection':
+ return {'nic': nic,'NetworkManager':True}
mode = generic_select(['DHCP (auto detect)', 'IP (static)'], f"Select which mode to configure for {nic}: ")
if mode == 'IP (static)':
while 1:
@@ -174,7 +186,7 @@ def ask_to_configure_network():
elif nic:
return nic
- return None
+ return {}
def ask_for_disk_layout():
options = {
@@ -245,8 +257,10 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
- drive = input('Select one of the above disks (by number or full path): ')
- if drive.isdigit():
+ drive = input('Select one of the above disks (by number or full path) or write /mnt to skip partitioning: ')
+ if drive.strip() == '/mnt':
+ return None
+ elif drive.isdigit():
drive = int(drive)
if drive >= len(drives):
raise DiskError(f'Selected option "{drive}" is out of range')
@@ -309,6 +323,8 @@ def select_language(options, show_only_country_codes=True):
:return: The language/dictionary key of the selected language
:rtype: str
"""
+ DEFAULT_KEYBOARD_LANGUAGE = 'us'
+
if show_only_country_codes:
languages = sorted([language for language in list(options) if len(language) == 2])
else:
@@ -318,9 +334,12 @@ def select_language(options, show_only_country_codes=True):
for index, language in enumerate(languages):
print(f"{index}: {language}")
- print(' -- You can enter ? or help to search for more languages --')
+ print(' -- You can enter ? or help to search for more languages, or skip to use US layout --')
selected_language = input('Select one of the above keyboard languages (by number or full name): ')
- if selected_language.lower() in ('?', 'help'):
+
+ if len(selected_language.strip()) == 0:
+ return DEFAULT_KEYBOARD_LANGUAGE
+ elif selected_language.lower() in ('?', 'help'):
while True:
filter_string = input('Search for layout containing (example: "sv-"): ')
new_options = list(search_keyboard_layout(filter_string))
@@ -333,16 +352,16 @@ def select_language(options, show_only_country_codes=True):
elif selected_language.isdigit() and (pos := int(selected_language)) <= len(languages)-1:
selected_language = languages[pos]
+ return selected_language
# I'm leaving "options" on purpose here.
# Since languages possibly contains a filtered version of
# all possible language layouts, and we might want to write
# for instance sv-latin1 (if we know that exists) without having to
# go through the search step.
- elif selected_language in options:
- selected_language = options[options.index(selected_language)]
+ elif selected_language in languages:
+ return selected_language
else:
- RequirementError("Selected language does not exist.")
- return selected_language
+ raise RequirementError("Selected language does not exist.")
raise RequirementError("Selecting languages require a least one language to be given as an option.")
@@ -366,26 +385,26 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
selected_mirrors = {}
if len(regions) >= 1:
- print_large_list(regions, margin_bottom=2)
+ print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --')
selected_mirror = input('Select one of the above regions to download packages from (by number or full name): ')
if len(selected_mirror.strip()) == 0:
+ # Returning back empty options which can be both used to
+ # do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining
return {}
- elif selected_mirror.isdigit() and (pos := int(selected_mirror)) <= len(regions)-1:
+ elif selected_mirror.isdigit() and int(selected_mirror) <= len(regions)-1:
+ # I'm leaving "mirrors" on purpose here.
+ # Since region possibly contains a known region of
+ # all possible regions, and we might want to write
+ # for instance Sweden (if we know that exists) without having to
+ # go through the search step.
region = regions[int(selected_mirror)]
selected_mirrors[region] = mirrors[region]
- # I'm leaving "mirrors" on purpose here.
- # Since region possibly contains a known region of
- # all possible regions, and we might want to write
- # for instance Sweden (if we know that exists) without having to
- # go through the search step.
elif selected_mirror in mirrors:
selected_mirrors[selected_mirror] = mirrors[selected_mirror]
else:
- RequirementError("Selected region does not exist.")
+ raise RequirementError("Selected region does not exist.")
return selected_mirrors
-
- raise RequirementError("Selecting mirror region require a least one region to be given as an option.")