Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall')
-rw-r--r--archinstall/__init__.py5
-rw-r--r--archinstall/lib/disk.py95
-rw-r--r--archinstall/lib/general.py84
-rw-r--r--archinstall/lib/hardware.py59
-rw-r--r--archinstall/lib/installer.py126
-rw-r--r--archinstall/lib/locale_helpers.py6
-rw-r--r--archinstall/lib/luks.py13
-rw-r--r--archinstall/lib/mirrors.py6
-rw-r--r--archinstall/lib/output.py92
-rw-r--r--archinstall/lib/profiles.py29
-rw-r--r--archinstall/lib/user_interaction.py539
11 files changed, 769 insertions, 285 deletions
diff --git a/archinstall/__init__.py b/archinstall/__init__.py
index 84ba0ec0..e2c7ea62 100644
--- a/archinstall/__init__.py
+++ b/archinstall/__init__.py
@@ -1,8 +1,9 @@
+"""Arch Linux installer - guided, templates etc."""
from .lib.general import *
from .lib.disk import *
from .lib.user_interaction import *
from .lib.exceptions import *
-from .lib.installer import *
+from .lib.installer import __packages__, Installer
from .lib.profiles import *
from .lib.luks import *
from .lib.mirrors import *
@@ -14,7 +15,7 @@ from .lib.output import *
from .lib.storage import *
from .lib.hardware import *
-__version__ = "2.1.4"
+__version__ = "2.2.0"
## Basic version of arg.parse() supporting:
## --key=value
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index c23bc6ac..44462a21 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,10 +1,11 @@
import glob, re, os, json, time, hashlib
-import pathlib, traceback
+import pathlib, traceback, logging
from collections import OrderedDict
from .exceptions import DiskError
from .general import *
-from .output import log, LOG_LEVELS
+from .output import log
from .storage import storage
+from .hardware import hasUEFI
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
@@ -73,7 +74,7 @@ class BlockDevice():
raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop':
- for drive in json.loads(b''.join(sys_command(f'losetup --json', hide_from_log=True)).decode('UTF_8'))['loopdevices']:
+ for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']:
if not drive['name'] == self.path: continue
return drive['back-file']
@@ -87,17 +88,17 @@ class BlockDevice():
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=LOG_LEVELS.Debug)
+ log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
def partitions(self):
- o = b''.join(sys_command(f'partprobe {self.path}'))
+ o = b''.join(sys_command(['partprobe', self.path]))
#o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
- o = b''.join(sys_command(f'/usr/bin/lsblk -J {self.path}'))
+ o = b''.join(sys_command(['/usr/bin/lsblk', '-J', self.path]))
if b'not a block device' in o:
raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
@@ -128,7 +129,7 @@ class BlockDevice():
@property
def uuid(self):
- log(f'BlockDevice().uuid is untested!', level=LOG_LEVELS.Warning, fg='yellow')
+ log(f'BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
@@ -220,9 +221,6 @@ class Partition():
@encrypted.setter
def encrypted(self, value :bool):
- if value:
- log(f'Marking {self} as encrypted: {value}', level=LOG_LEVELS.Debug)
- #log(f"Callstrack when marking the partition: {''.join(traceback.format_stack())}", level=LOG_LEVELS.Debug)
self._encrypted = value
@@ -239,7 +237,7 @@ class Partition():
return self.path
def detect_inner_filesystem(self, password):
- log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=LOG_LEVELS.Info)
+ log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from .luks import luks2
try:
@@ -268,16 +266,16 @@ class Partition():
def safe_to_format(self):
if self.allow_formatting is False:
- log(f"Partition {self} is not marked for formatting.", level=LOG_LEVELS.Debug)
+ log(f"Partition {self} is not marked for formatting.", level=logging.DEBUG)
return False
elif self.target_mountpoint == '/boot':
try:
if self.has_content():
- log(f"Partition {self} is a boot partition and has content inside.", level=LOG_LEVELS.Debug)
+ log(f"Partition {self} is a boot partition and has content inside.", level=logging.DEBUG)
return False
except SysCallError as err:
- log(err.message, LOG_LEVELS.Debug)
- log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=LOG_LEVELS.Debug)
+ log(err.message, logging.DEBUG)
+ log(f"Partition {self} was identified as /boot but we could not mount to check for content, continuing!", level=logging.DEBUG)
pass
return True
@@ -292,7 +290,7 @@ class Partition():
raise DiskError(f"Attempting to encrypt a partition that was not marked for encryption: {self}")
if not self.safe_to_format():
- log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=LOG_LEVELS.Error, fg="red")
+ log(f"Partition {self} was marked as protected but encrypt() was called on it!", level=logging.ERROR, fg="red")
return False
handle = luks2(self, None, None)
@@ -320,7 +318,7 @@ class Partition():
raise PermissionError(f"{self} is not formatable either because instance is locked ({self.allow_formatting}) or a blocking flag was given ({allow_formatting})")
if log_formatting:
- log(f'Formatting {path} -> {filesystem}', level=LOG_LEVELS.Info)
+ log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs':
o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}'))
@@ -375,7 +373,7 @@ class Partition():
def mount(self, target, fs=None, options=''):
if not self.mountpoint:
- log(f'Mounting {self} to {target}', level=LOG_LEVELS.Info)
+ log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
@@ -427,25 +425,30 @@ class Filesystem():
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
- def __init__(self, blockdevice, mode=GPT):
+ def __init__(self, blockdevice,mode):
self.blockdevice = blockdevice
self.mode = mode
def __enter__(self, *args, **kwargs):
if self.blockdevice.keep_partitions is False:
- log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=LOG_LEVELS.Debug)
+ log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=logging.DEBUG)
if self.mode == GPT:
if self.raw_parted(f'{self.blockdevice.device} mklabel gpt').exit_code == 0:
self.blockdevice.flush_cache()
return self
else:
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
+ elif self.mode == MBR:
+ if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0:
+ return self
+ else:
+ raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
else:
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
elif self.mode == self.blockdevice.partition_table_type:
- log(f'Kept partition format {self.mode} for {self.blockdevice}', level=LOG_LEVELS.Debug)
+ log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG)
else:
raise DiskError(f'The selected partition table format {self.mode} does not match that of {self.blockdevice}.')
@@ -468,7 +471,6 @@ class Filesystem():
def raw_parted(self, string:str):
x = sys_command(f'/usr/bin/parted -s {string}')
- log(f"'parted -s {string}' returned: {b''.join(x)}", level=LOG_LEVELS.Debug)
return x
def parted(self, string:str):
@@ -481,29 +483,40 @@ class Filesystem():
return self.raw_parted(string).exit_code
def use_entire_disk(self, root_filesystem_type='ext4'):
- log(f"Using and formatting the entire {self.blockdevice}.", level=LOG_LEVELS.Debug)
- self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
- self.set_name(0, 'EFI')
- self.set(0, 'boot on')
- # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
- # https://www.gnu.org/software/parted/manual/html_node/set.html
- self.set(0, 'esp on')
- self.add_partition('primary', start='513MiB', end='100%')
-
- self.blockdevice.partition[0].filesystem = 'vfat'
- self.blockdevice.partition[1].filesystem = root_filesystem_type
- log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=LOG_LEVELS.Debug)
-
- self.blockdevice.partition[0].target_mountpoint = '/boot'
- self.blockdevice.partition[1].target_mountpoint = '/'
-
- self.blockdevice.partition[0].allow_formatting = True
- self.blockdevice.partition[1].allow_formatting = True
+ log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG)
+ if hasUEFI():
+ self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
+ self.set_name(0, 'EFI')
+ self.set(0, 'boot on')
+ # TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
+ # https://www.gnu.org/software/parted/manual/html_node/set.html
+ self.set(0, 'esp on')
+ self.add_partition('primary', start='513MiB', end='100%')
+
+ self.blockdevice.partition[0].filesystem = 'vfat'
+ self.blockdevice.partition[1].filesystem = root_filesystem_type
+ log(f"Set the root partition {self.blockdevice.partition[1]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
+
+ self.blockdevice.partition[0].target_mountpoint = '/boot'
+ self.blockdevice.partition[1].target_mountpoint = '/'
+
+ self.blockdevice.partition[0].allow_formatting = True
+ self.blockdevice.partition[1].allow_formatting = True
+ else:
+ #we don't need a seprate boot partition it would be a waste of space
+ self.add_partition('primary', start='1MB', end='100%')
+ self.blockdevice.partition[0].filesystem=root_filesystem_type
+ log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
+ self.blockdevice.partition[0].target_mountpoint = '/'
+ self.blockdevice.partition[0].allow_formatting = True
def add_partition(self, type, start, end, format=None):
- log(f'Adding partition to {self.blockdevice}', level=LOG_LEVELS.Info)
+ log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
previous_partitions = self.blockdevice.partitions
+ if self.mode == MBR:
+ if len(self.blockdevice.partitions)>3:
+ DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
if format:
partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {format} {start} {end}') == 0
else:
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 6e3b66f1..eb0c5d14 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,10 +1,10 @@
import os, json, hashlib, shlex, sys
-import time, pty
+import time, pty, logging
from datetime import datetime, date
from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
from .exceptions import *
-from .output import log, LOG_LEVELS
+from .output import log
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
@@ -76,7 +76,7 @@ class sys_command():#Thread):
"""
Stolen from archinstall_gui
"""
- def __init__(self, cmd, callback=None, start_callback=None, environment_vars={}, *args, **kwargs):
+ def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs):
kwargs.setdefault("worker_id", gen_uid())
kwargs.setdefault("emulate", False)
kwargs.setdefault("suppress_errors", False)
@@ -84,15 +84,24 @@ class sys_command():#Thread):
self.log = kwargs.get('log', log)
if kwargs['emulate']:
- self.log(f"Starting command '{cmd}' in emulation mode.", level=LOG_LEVELS.Debug)
+ self.log(f"Starting command '{cmd}' in emulation mode.", level=logging.DEBUG)
+
+ if type(cmd) is list:
+ # if we get a list of arguments
+ self.raw_cmd = shlex.join(cmd)
+ self.cmd = cmd
+ else:
+ # else consider it a single shell string
+ # this should only be used if really necessary
+ self.raw_cmd = cmd
+ try:
+ self.cmd = shlex.split(cmd)
+ except Exception as e:
+ raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
- self.raw_cmd = cmd
- try:
- self.cmd = shlex.split(cmd)
- except Exception as e:
- raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
self.args = args
self.kwargs = kwargs
+ self.peak_output = peak_output
self.environment_vars = environment_vars
self.kwargs.setdefault("worker", None)
@@ -151,6 +160,38 @@ class sys_command():#Thread):
'exit_code': self.exit_code
}
+ def peak(self, output :str):
+ if type(output) == bytes:
+ try:
+ output = output.decode('UTF-8')
+ except UnicodeDecodeError:
+ return None
+
+ output = output.strip('\r\n ')
+ if len(output) <= 0:
+ return None
+
+ if self.peak_output:
+ from .user_interaction import get_terminal_width
+
+ # Move back to the beginning of the terminal
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % 0)
+ sys.stdout.flush()
+
+ # Clear the line
+ sys.stdout.write(" " * get_terminal_width())
+ sys.stdout.flush()
+
+ # Move back to the beginning again
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % 0)
+ sys.stdout.flush()
+
+ # And print the new output we're peaking on:
+ sys.stdout.write(output)
+ sys.stdout.flush()
+
def run(self):
self.status = 'running'
old_dir = os.getcwd()
@@ -163,7 +204,7 @@ class sys_command():#Thread):
os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
except FileNotFoundError:
self.status = 'done'
- self.log(f"{self.cmd[0]} does not exist.", level=LOG_LEVELS.Debug)
+ self.log(f"{self.cmd[0]} does not exist.", level=logging.DEBUG)
self.exit_code = 1
return False
@@ -173,8 +214,8 @@ class sys_command():#Thread):
poller.register(child_fd, EPOLLIN | EPOLLHUP)
if 'events' in self.kwargs and 'debug' in self.kwargs:
- self.log(f'[D] Using triggers for command: {self.cmd}', level=LOG_LEVELS.Debug)
- self.log(json.dumps(self.kwargs['events']), level=LOG_LEVELS.Debug)
+ self.log(f'[D] Using triggers for command: {self.cmd}', level=logging.DEBUG)
+ self.log(json.dumps(self.kwargs['events']), level=logging.DEBUG)
alive = True
last_trigger_pos = 0
@@ -182,13 +223,14 @@ class sys_command():#Thread):
for fileno, event in poller.poll(0.1):
try:
output = os.read(child_fd, 8192)
+ self.peak(output)
self.trace_log += output
except OSError:
alive = False
break
if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
- self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=LOG_LEVELS.Debug)
+ self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=logging.DEBUG)
if 'on_output' in self.kwargs:
self.kwargs['on_output'](self.kwargs['worker'], output)
@@ -209,8 +251,8 @@ class sys_command():#Thread):
trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=LOG_LEVELS.Debug)
- self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=LOG_LEVELS.Debug)
+ self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
+ self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
@@ -224,18 +266,18 @@ class sys_command():#Thread):
## Adding a exit trigger:
if len(self.kwargs['events']) == 0:
if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=LOG_LEVELS.Debug)
+ self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG)
if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} has finished.", level=LOG_LEVELS.Debug)
+ self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG)
alive = False
break
self.status = 'done'
if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} waiting for exit code.", level=LOG_LEVELS.Debug)
+ self.log(f"{self.cmd[0]} waiting for exit code.", level=logging.DEBUG)
if not self.kwargs['emulate']:
try:
@@ -249,14 +291,14 @@ class sys_command():#Thread):
self.exit_code = 0
if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=LOG_LEVELS.Debug)
+ self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=logging.DEBUG)
if 'ignore_errors' in self.kwargs:
self.exit_code = 0
if self.exit_code != 0 and not self.kwargs['suppress_errors']:
- #self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
- #self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=LOG_LEVELS.Error)
+ #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
+ #self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR)
raise SysCallError(message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", exit_code=self.exit_code)
self.ended = time.time()
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index 5828fd09..f139dfe4 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -1,14 +1,44 @@
-import os
+import os, subprocess, json
from .general import sys_command
from .networking import list_interfaces, enrichIfaceTypes
+from typing import Optional
-def hasWifi():
+__packages__ = ['xf86-video-amdgpu', 'xf86-video-ati', 'xf86-video-intel', 'xf86-video-nouveau', 'xf86-video-fbdev', 'xf86-video-vesa', 'xf86-video-vmware', 'nvidia', 'mesa']
+
+AVAILABLE_GFX_DRIVERS = {
+ # Sub-dicts are layer-2 options to be selected
+ # and lists are a list of packages to be installed
+ 'AMD / ATI' : {
+ 'amd' : ['xf86-video-amdgpu'],
+ 'ati' : ['xf86-video-ati']
+ },
+ 'intel' : ['xf86-video-intel'],
+ 'nvidia' : {
+ 'open-source' : ['xf86-video-nouveau'],
+ 'proprietary' : ['nvidia']
+ },
+ 'mesa' : ['mesa'],
+ 'fbdev' : ['xf86-video-fbdev'],
+ 'vesa' : ['xf86-video-vesa'],
+ 'vmware / virtualbox' : ['xf86-video-vmware']
+}
+
+def hasWifi()->bool:
return 'WIRELESS' in enrichIfaceTypes(list_interfaces().values()).values()
-def hasUEFI():
+def hasAMDCPU()->bool:
+ if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():
+ return True
+ return False
+def hasIntelCPU()->bool:
+ if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode():
+ return True
+ return False
+
+def hasUEFI()->bool:
return os.path.isdir('/sys/firmware/efi')
-def graphicsDevices():
+def graphicsDevices()->dict:
cards = {}
for line in sys_command(f"lspci"):
if b' VGA ' in line:
@@ -16,13 +46,28 @@ def graphicsDevices():
cards[identifier.strip().lower().decode('UTF-8')] = line
return cards
-def hasNvidiaGraphics():
+def hasNvidiaGraphics()->bool:
return any('nvidia' in x for x in graphicsDevices())
-def hasAmdGraphics():
+def hasAmdGraphics()->bool:
return any('amd' in x for x in graphicsDevices())
-def hasIntelGraphics():
+def hasIntelGraphics()->bool:
return any('intel' in x for x in graphicsDevices())
+
+def cpuVendor()-> Optional[str]:
+ cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu']
+ for info in cpu_info:
+ if info.get('field',None):
+ if info.get('field',None) == "Vendor ID:":
+ return info.get('data',None)
+
+def isVM() -> bool:
+ try:
+ subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine
+ return True
+ except:
+ return False
+
# TODO: Add more identifiers
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 758033a7..331762b4 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,5 +1,5 @@
-import os, stat, time, shutil, pathlib, subprocess
-
+import os, stat, time, shutil, pathlib
+import subprocess, logging
from .exceptions import *
from .disk import *
from .general import *
@@ -7,8 +7,12 @@ from .user_interaction import *
from .profiles import Profile
from .mirrors import *
from .systemd import Networkd
-from .output import log, LOG_LEVELS
+from .output import log
from .storage import storage
+from .hardware import *
+
+# Any package that the Installer() is responsible for (optional and the default ones)
+__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
class Installer():
"""
@@ -18,7 +22,7 @@ class Installer():
:param partition: Requires a partition as the first argument, this is
so that the installer can mount to `mountpoint` and strap packages there.
:type partition: class:`archinstall.Partition`
-
+
:param boot_partition: There's two reasons for needing a boot partition argument,
The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place
during the `pacstrap` or `linux` and the base packages for a minimal installation.
@@ -29,12 +33,12 @@ class Installer():
:param profile: A profile to install, this is optional and can be called later manually.
This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on.
:type profile: str, optional
-
+
:param hostname: The given /etc/hostname for the machine.
:type hostname: str, optional
"""
- def __init__(self, target, *, base_packages='base base-devel linux linux-firmware efibootmgr'):
+ def __init__(self, target, *, base_packages=__packages__[:3], kernels=['linux']):
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
@@ -43,14 +47,17 @@ class Installer():
'base' : False,
'bootloader' : False
}
-
+
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
+ for kernel in kernels:
+ self.base_packages.append(kernel)
+
self.post_base_install = []
storage['session'] = self
self.partitions = get_partitions_in_use(self.target)
- def log(self, *args, level=LOG_LEVELS.Debug, **kwargs):
+ def log(self, *args, level=logging.DEBUG, **kwargs):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
Any manual override can be done per log() call.
@@ -65,8 +72,8 @@ class Installer():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
- #self.log(self.trace_log.decode('UTF-8'), level=LOG_LEVELS.Debug)
- self.log(args[1], level=LOG_LEVELS.Error, fg='red')
+ #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
+ self.log(args[1], level=logging.ERROR, fg='red')
self.sync_log_to_install_medium()
@@ -79,17 +86,17 @@ class Installer():
self.genfstab()
if not (missing_steps := self.post_install_check()):
- self.log('Installation completed without any errors. You may now reboot.', bg='black', fg='green', level=LOG_LEVELS.Info)
+ self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
self.sync_log_to_install_medium()
return True
else:
- self.log('Some required steps were not successfully installed/configured before leaving the installer:', bg='black', fg='red', level=LOG_LEVELS.Warning)
+ self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
for step in missing_steps:
- self.log(f' - {step}', bg='black', fg='red', level=LOG_LEVELS.Warning)
+ self.log(f' - {step}', fg='red', level=logging.WARNING)
- self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=LOG_LEVELS.Warning)
- self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=LOG_LEVELS.Warning)
+ self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
+ self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
self.sync_log_to_install_medium()
return False
@@ -119,21 +126,21 @@ class Installer():
def pacstrap(self, *packages, **kwargs):
if type(packages[0]) in (list, tuple): packages = packages[0]
- self.log(f'Installing packages: {packages}', level=LOG_LEVELS.Info)
+ self.log(f'Installing packages: {packages}', level=logging.INFO)
if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
return True
else:
- self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=LOG_LEVELS.Info)
+ self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO)
else:
- self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=LOG_LEVELS.Info)
+ self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO)
def set_mirrors(self, mirrors):
return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
def genfstab(self, flags='-pU'):
- self.log(f"Updating {self.target}/etc/fstab", level=LOG_LEVELS.Info)
+ self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
@@ -141,7 +148,7 @@ class Installer():
if not os.path.isfile(f'{self.target}/etc/fstab'):
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}')
-
+
return True
def set_hostname(self, hostname :str, *args, **kwargs):
@@ -169,19 +176,19 @@ class Installer():
else:
self.log(
f"Time zone {zone} does not exist, continuing with system default.",
- level=LOG_LEVELS.Warning,
+ level=logging.WARNING,
fg='red'
)
def activate_ntp(self):
- self.log(f'Installing and activating NTP.', level=LOG_LEVELS.Info)
+ self.log(f'Installing and activating NTP.', level=logging.INFO)
if self.pacstrap('ntp'):
if self.enable_service('ntpd'):
return True
def enable_service(self, *services):
for service in services:
- self.log(f'Enabling service {service}', level=LOG_LEVELS.Info)
+ self.log(f'Enabling service {service}', level=logging.INFO)
if (output := self.arch_chroot(f'systemctl enable {service}')).exit_code != 0:
raise ServiceException(f"Unable to start service {service}: {output}")
@@ -189,6 +196,9 @@ class Installer():
return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs):
+ if 'runas' in kwargs:
+ cmd = f"su - {kwargs['runas']} -c \"{cmd}\""
+
return self.run_command(cmd)
def drop_to_shell(self):
@@ -223,7 +233,7 @@ class Installer():
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
self.base_packages.append('iwd')
- # This function will be called after minimal_installation()
+ # This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
def post_install_enable_iwd_service(*args, **kwargs):
@@ -273,6 +283,7 @@ class Installer():
## (encrypted partitions default to btrfs for now, so we need btrfs-progs)
## TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now.
+
MODULES = []
BINARIES = []
FILES = []
@@ -298,10 +309,20 @@ class Installer():
if 'encrypt' not in HOOKS:
HOOKS.insert(HOOKS.index('filesystems'), 'encrypt')
+ if not(hasUEFI()): # TODO: Allow for grub even on EFI
+ self.base_packages.append('grub')
+
self.pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
#self.genfstab()
-
+ if not isVM():
+ vendor = cpuVendor()
+ if vendor == "AuthenticAMD":
+ self.base_packages.append("amd-ucode")
+ elif vendor == "GenuineIntel":
+ self.base_packages.append("intel-ucode")
+ else:
+ self.log("Unknown cpu vendor not installing ucode")
with open(f"{self.target}/etc/fstab", "a") as fstab:
fstab.write(
"\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
@@ -322,13 +343,13 @@ class Installer():
mkinit.write(f"BINARIES=({' '.join(BINARIES)})\n")
mkinit.write(f"FILES=({' '.join(FILES)})\n")
mkinit.write(f"HOOKS=({' '.join(HOOKS)})\n")
- sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio -p linux')
+ sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio -P')
self.helper_flags['base'] = True
# Run registered post-install hooks
for function in self.post_base_install:
- self.log(f"Running post-installation hook: {function}", level=LOG_LEVELS.Info)
+ self.log(f"Running post-installation hook: {function}", level=logging.INFO)
function(self)
return True
@@ -342,9 +363,13 @@ class Installer():
elif partition.mountpoint == self.target:
root_partition = partition
- self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=LOG_LEVELS.Info)
+ self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl':
+ self.pacstrap('efibootmgr')
+
+ if not hasUEFI():
+ raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
@@ -372,13 +397,20 @@ class Installer():
## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
## And blkid is wrong in terms of LUKS.
#UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
-
# Setup the loader entry
with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
entry.write(f'# Created by: archinstall\n')
entry.write(f'# Created on: {self.init_time}\n')
entry.write(f'title Arch Linux\n')
entry.write(f'linux /vmlinuz-linux\n')
+ if not isVM():
+ vendor = cpuVendor()
+ if vendor == "AuthenticAMD":
+ entry.write("initrd /amd-ucode.img\n")
+ elif vendor == "GenuineIntel":
+ entry.write("initrd /intel-ucode.img\n")
+ else:
+ self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
entry.write(f'initrd /initramfs-linux.img\n')
## blkid doesn't trigger on loopback devices really well,
## so we'll use the old manual method until we get that sorted out.
@@ -387,18 +419,34 @@ class Installer():
if (real_device := self.detect_encryption(root_partition)):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=LOG_LEVELS.Debug)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=LOG_LEVELS.Debug)
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp\n')
self.helper_flags['bootloader'] = bootloader
return True
- raise RequirementError(f"Could not identify the UUID of {root_partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
+ raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
+ elif bootloader == "grub-install":
+ self.pacstrap('grub')
+
+ if hasUEFI():
+ self.pacstrap('efibootmgr')
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
+ sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
+ return True
+ else:
+ root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/","")}/..)"', shell=True).decode().strip()
+ if root_device == "block":
+ root_device = f"{root_partition.path}"
+ o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}'))
+ sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
+ self.helper_flags['bootloader'] = bootloader
+ return True
else:
- raise RequirementError(f"Unknown (or not yet implemented) bootloader added to add_bootloader(): {bootloader}")
+ raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}")
def add_additional_packages(self, *packages):
return self.pacstrap(*packages)
@@ -416,17 +464,17 @@ class Installer():
if type(profile) == str:
profile = Profile(self, profile)
- self.log(f'Installing network profile {profile}', level=LOG_LEVELS.Info)
+ self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
def enable_sudo(self, entity :str, group=False):
- self.log(f'Enabling sudo permissions for {entity}.', level=LOG_LEVELS.Info)
+ self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
def user_create(self, user :str, password=None, groups=[], sudo=False):
- self.log(f'Creating user {user}', level=LOG_LEVELS.Info)
+ self.log(f'Creating user {user}', level=logging.INFO)
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password:
self.user_set_pw(user, password)
@@ -439,7 +487,7 @@ class Installer():
self.helper_flags['user'] = True
def user_set_pw(self, user, password):
- self.log(f'Setting password for {user}', level=LOG_LEVELS.Info)
+ self.log(f'Setting password for {user}', level=logging.INFO)
if user == 'root':
# This means the root account isn't locked/disabled with * in /etc/passwd
@@ -449,7 +497,7 @@ class Installer():
pass
def user_set_shell(self, user, shell):
- self.log(f'Setting shell for {user} to {shell}', level=LOG_LEVELS.Info)
+ self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass
@@ -460,5 +508,5 @@ class Installer():
vconsole.write(f'KEYMAP={language}\n')
vconsole.write(f'FONT=lat9w-16\n')
else:
- self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=LOG_LEVELS.Info)
+ self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
return True
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index 736bfc47..3c373bc6 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -16,6 +16,12 @@ def list_keyboard_languages():
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
+def verify_keyboard_layout(layout):
+ for language in list_keyboard_languages():
+ if layout.lower() == language.lower():
+ return True
+ return False
+
def search_keyboard_layout(filter):
for language in list_keyboard_languages():
if filter.lower() in language.lower():
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index ca077b3d..7f8485e6 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -2,10 +2,11 @@ import os
import shlex
import time
import pathlib
+import logging
from .exceptions import *
from .general import *
from .disk import Partition
-from .output import log, LOG_LEVELS
+from .output import log
from .storage import storage
class luks2():
@@ -48,7 +49,7 @@ class luks2():
if not self.partition.allow_formatting:
raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
- log(f'Encrypting {partition} (This might take a while)', level=LOG_LEVELS.Info)
+ log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
if not key_file:
if self.key_file:
@@ -70,7 +71,7 @@ class luks2():
'--batch-mode',
'--verbose',
'--type', 'luks2',
- '--pbkdf', 'argon2i',
+ '--pbkdf', 'argon2id',
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
@@ -84,7 +85,7 @@ class luks2():
cmd_handle = sys_command(cryptsetup_args)
except SysCallError as err:
if err.exit_code == 256:
- log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=LOG_LEVELS.Debug)
+ log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
# Partition was in use, unmount it and try again
partition.unmount()
@@ -97,11 +98,11 @@ class luks2():
for child in children:
# Unmount the child location
if child_mountpoint := child.get('mountpoint', None):
- log(f'Unmounting {child_mountpoint}', level=LOG_LEVELS.Debug)
+ log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
sys_command(f"umount -R {child_mountpoint}")
# And close it if possible.
- log(f"Closing crypt device {child['name']}", level=LOG_LEVELS.Debug)
+ log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
sys_command(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index 04f47c0d..ae6c6422 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,4 +1,4 @@
-import urllib.request
+import urllib.request, logging
from .exceptions import *
from .general import *
@@ -59,7 +59,7 @@ def insert_mirrors(mirrors, *args, **kwargs):
return True
def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
- log(f'A new package mirror-list has been created: {destination}', level=LOG_LEVELS.Info)
+ log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
for region, mirrors in regions.items():
with open(destination, 'w') as mirrorlist:
for mirror in mirrors:
@@ -79,7 +79,7 @@ def list_mirrors():
try:
response = urllib.request.urlopen(url)
except urllib.error.URLError as err:
- log(f'Could not fetch an active mirror-list: {err}', level=LOG_LEVELS.Warning, fg="yellow")
+ log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="yellow")
return regions
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 6b184b4b..06d99778 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -17,8 +17,32 @@ class LOG_LEVELS:
class journald(dict):
@abc.abstractmethod
- def log(message, level=LOG_LEVELS.Debug):
- import systemd.journal
+ def log(message, level=logging.DEBUG):
+ try:
+ import systemd.journal
+ except ModuleNotFoundError:
+ return False
+
+ # For backwards compability, convert old style log-levels
+ # to logging levels (and warn about deprecated usage)
+ # There's some code re-usage here but that should be fine.
+ # TODO: Remove these in a few versions:
+ if level == LOG_LEVELS.Critical:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ level = logging.CRITICAL
+ elif level == LOG_LEVELS.Error:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ level = logging.ERROR
+ elif level == LOG_LEVELS.Warning:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ level = logging.WARNING
+ elif level == LOG_LEVELS.Info:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ level = logging.INFO
+ elif level == LOG_LEVELS.Debug:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ level = logging.DEBUG
+
log_adapter = logging.getLogger('archinstall')
log_fmt = logging.Formatter("[%(levelname)s]: %(message)s")
log_ch = systemd.journal.JournalHandler()
@@ -26,19 +50,7 @@ class journald(dict):
log_adapter.addHandler(log_ch)
log_adapter.setLevel(logging.DEBUG)
- if level == LOG_LEVELS.Critical:
- log_adapter.critical(message)
- elif level == LOG_LEVELS.Error:
- log_adapter.error(message)
- elif level == LOG_LEVELS.Warning:
- log_adapter.warning(message)
- elif level == LOG_LEVELS.Info:
- log_adapter.info(message)
- elif level == LOG_LEVELS.Debug:
- log_adapter.debug(message)
- else:
- # Fallback logger
- log_adapter.debug(message)
+ log_adapter.log(level, message)
# TODO: Replace log() for session based logging.
class SessionLogging():
@@ -95,19 +107,19 @@ def log(*args, **kwargs):
# we use that one to output everything
if (filename := storage.get('LOG_FILE', None)):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
- if not os.path.isfile(absolute_logfile):
- try:
- Path(absolute_logfile).parents[0].mkdir(exist_ok=True, parents=True)
- except PermissionError:
- # Fallback to creating the log file in the current folder
- err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute()/filename} instead."
- absolute_logfile = Path('./').absolute()/filename
- absolute_logfile.parents[0].mkdir(exist_ok=True)
- absolute_logfile = str(absolute_logfile)
- storage['LOG_PATH'] = './'
- log(err_string, fg="red")
-
- Path(absolute_logfile).touch() # Overkill?
+
+ try:
+ Path(absolute_logfile).parents[0].mkdir(exist_ok=True, parents=True)
+ with open(absolute_logfile, 'a') as log_file:
+ log_file.write("")
+ except PermissionError:
+ # Fallback to creating the log file in the current folder
+ err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute()/filename} instead."
+ absolute_logfile = Path('./').absolute()/filename
+ absolute_logfile.parents[0].mkdir(exist_ok=True)
+ absolute_logfile = str(absolute_logfile)
+ storage['LOG_PATH'] = './'
+ log(err_string, fg="red")
with open(absolute_logfile, 'a') as log_file:
log_file.write(f"{orig_string}\n")
@@ -116,13 +128,33 @@ def log(*args, **kwargs):
# Unless the level is higher than we've decided to output interactively.
# (Remember, log files still get *ALL* the output despite level restrictions)
if 'level' in kwargs:
- if kwargs['level'] > storage.get('LOG_LEVEL', LOG_LEVELS.Info):
+ # For backwards compability, convert old style log-levels
+ # to logging levels (and warn about deprecated usage)
+ # There's some code re-usage here but that should be fine.
+ # TODO: Remove these in a few versions:
+ if kwargs['level'] == LOG_LEVELS.Critical:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ kwargs['level'] = logging.CRITICAL
+ elif kwargs['level'] == LOG_LEVELS.Error:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ kwargs['level'] = logging.ERROR
+ elif kwargs['level'] == LOG_LEVELS.Warning:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ kwargs['level'] = logging.WARNING
+ elif kwargs['level'] == LOG_LEVELS.Info:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ kwargs['level'] = logging.INFO
+ elif kwargs['level'] == LOG_LEVELS.Debug:
+ log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
+ kwargs['level'] = logging.DEBUG
+
+ if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and not 'force' in kwargs:
# Level on log message was Debug, but output level is set to Info.
# In that case, we'll drop it.
return None
try:
- journald.log(string, level=kwargs.get('level', LOG_LEVELS.Info))
+ journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError:
pass # Ignore writing to journald
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 21ec5f6f..06237c1c 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,10 +1,10 @@
import os, urllib.request, urllib.parse, ssl, json, re
-import importlib.util, sys, glob, hashlib
+import importlib.util, sys, glob, hashlib, logging
from collections import OrderedDict
from .general import multisplit, sys_command
from .exceptions import *
from .networking import *
-from .output import log, LOG_LEVELS
+from .output import log
from .storage import storage
def grab_url_data(path):
@@ -15,7 +15,7 @@ def grab_url_data(path):
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
-def list_profiles(filter_irrelevant_macs=True, subpath=''):
+def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
# TODO: Grab from github page as well, not just local static files
if filter_irrelevant_macs:
local_macs = list_interfaces()
@@ -63,6 +63,11 @@ def list_profiles(filter_irrelevant_macs=True, subpath=''):
cache[profile[:-3]] = {'path' : os.path.join(storage["UPSTREAM_URL"]+subpath, profile), 'description' : profile_list[profile], 'tailored' : tailored}
+ if filter_top_level_profiles:
+ for profile in list(cache.keys()):
+ if Profile(None, profile).is_top_level_profile() is False:
+ del(cache[profile])
+
return cache
class Script():
@@ -77,7 +82,6 @@ class Script():
self.examples = None
self.namespace = os.path.splitext(os.path.basename(self.path))[0]
self.original_namespace = self.namespace
- log(f"Script {self} has been loaded with namespace '{self.namespace}'", level=LOG_LEVELS.Debug)
def __enter__(self, *args, **kwargs):
self.execute()
@@ -88,6 +92,9 @@ class Script():
if len(args) >= 2 and args[1]:
raise args[1]
+ if self.original_namespace:
+ self.namespace = self.original_namespace
+
def localize_path(self, profile_path):
if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'):
if not self.converted_path:
@@ -177,7 +184,7 @@ class Profile(Script):
if hasattr(imported, '_prep_function'):
return True
return False
- """
+
def has_post_install(self):
with open(self.path, 'r') as source:
source_data = source.read()
@@ -193,14 +200,19 @@ class Profile(Script):
with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
if hasattr(imported, '_post_install'):
return True
- """
def is_top_level_profile(self):
with open(self.path, 'r') as source:
source_data = source.read()
- # TODO: I imagine that there is probably a better way to write this.
- return 'top_level_profile = True' in source_data
+ if '__name__' in source_data and 'is_top_level_profile' in source_data:
+ with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
+ if hasattr(imported, 'is_top_level_profile'):
+ return imported.is_top_level_profile
+
+ # Default to True if nothing is specified,
+ # since developers like less code - omitting it should assume they want to present it.
+ return True
@property
def packages(self) -> list:
@@ -222,7 +234,6 @@ class Profile(Script):
if hasattr(imported, '__packages__'):
return imported.__packages__
return None
-
class Application(Profile):
def __repr__(self, *args, **kwargs):
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index df8668af..be01594e 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,11 +1,14 @@
-import getpass, pathlib, os, shutil, re
-import sys, time, signal
+import getpass, pathlib, os, shutil, re, time
+import sys, time, signal, ipaddress, logging
+import termios, tty, select # Used for char by char polling of sys.stdin
from .exceptions import *
from .profiles import Profile
-from .locale_helpers import search_keyboard_layout
-from .output import log, LOG_LEVELS
+from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
+from .output import log
from .storage import storage
from .networking import list_interfaces
+from .general import sys_command
+from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI
## TODO: Some inconsistencies between the selection processes.
## Some return the keys from the options, some the values?
@@ -24,7 +27,7 @@ def check_for_correct_username(username):
return True
log(
"The username you entered is invalid. Try again",
- level=LOG_LEVELS.Warning,
+ level=logging.WARNING,
fg='red'
)
return False
@@ -93,14 +96,181 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end = spaces)
print()
-def ask_for_superuser_account(prompt='Create a required super-user with sudo privileges: ', forced=False):
+ return column, row
+
+
+def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False):
+ if sort:
+ options = sorted(options)
+
+ section = MiniCurses(get_terminal_width(), len(options))
+
+ selected_options = []
+
+ while True:
+ if len(selected_options) <= 0 and default and default in options:
+ selected_options.append(default)
+
+ printed_options = []
+ for option in options:
+ if option in selected_options:
+ printed_options.append(f'>> {option}')
+ else:
+ printed_options.append(f'{option}')
+
+ section.clear(0, get_terminal_height()-section._cursor_y-1)
+ x, y = print_large_list(printed_options, margin_bottom=2)
+ section._cursor_y = len(printed_options)
+ section._cursor_x = 0
+ section.write_line(text)
+ section.input_pos = section._cursor_x
+ selected_option = section.get_keyboard_input(end=None)
+
+ if selected_option is None:
+ if len(selected_options) <= 0 and default:
+ selected_options = [default]
+
+ if len(selected_options) or allow_empty is True:
+ break
+ else:
+ log('* Need to select at least one option!', fg='red')
+ continue
+
+ elif selected_option.isdigit():
+ if (selected_option := int(selected_option)) >= len(options):
+ log('* Option is out of range, please select another one!', fg='red')
+ continue
+ selected_option = options[selected_option]
+ if selected_option in selected_options:
+ selected_options.remove(selected_option)
+ else:
+ selected_options.append(selected_option)
+
+ return selected_options
+
+
+class MiniCurses():
+ def __init__(self, width, height):
+ self.width = width
+ self.height = height
+
+ self._cursor_y = 0
+ self._cursor_x = 0
+
+ self.input_pos = 0
+
+ def write_line(self, text, clear_line=True):
+ if clear_line:
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % 0)
+ sys.stdout.flush()
+ sys.stdout.write(" " * (get_terminal_width()-1))
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % 0)
+ sys.stdout.flush()
+ sys.stdout.write(text)
+ sys.stdout.flush()
+ self._cursor_x += len(text)
+
+ def clear(self, x, y):
+ if x < 0: x = 0
+ if y < 0: y = 0
+
+ #import time
+ #sys.stdout.write(f"Clearing from: {x, y}")
+ #sys.stdout.flush()
+ #time.sleep(2)
+
+ sys.stdout.flush()
+ sys.stdout.write('\033[%d;%df' % (y, x))
+ for line in range(get_terminal_height()-y-1, y):
+ sys.stdout.write(" " * (get_terminal_width()-1))
+ sys.stdout.flush()
+ sys.stdout.write('\033[%d;%df' % (y, x))
+ sys.stdout.flush()
+
+ def deal_with_control_characters(self, char):
+ mapper = {
+ '\x7f' : 'BACKSPACE',
+ '\r' : 'CR',
+ '\n' : 'NL'
+ }
+
+ if (mapped_char := mapper.get(char, None)) == 'BACKSPACE':
+ if self._cursor_x <= self.input_pos:
+ # Don't backspace futher back than the cursor start position during input
+ return True
+ # Move back to the current known position (BACKSPACE doesn't updated x-pos)
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % (self._cursor_x))
+ sys.stdout.flush()
+
+ # Write a blank space
+ sys.stdout.flush()
+ sys.stdout.write(" ")
+ sys.stdout.flush()
+
+ # And move back again
+ sys.stdout.flush()
+ sys.stdout.write("\033[%dG" % (self._cursor_x))
+ sys.stdout.flush()
+
+ self._cursor_x -= 1
+
+ return True
+ elif mapped_char in ('CR', 'NL'):
+ return True
+
+ return None
+
+ def get_keyboard_input(self, strip_rowbreaks=True, end='\n'):
+ assert end in ['\r', '\n', None]
+
+ poller = select.epoll()
+ response = ''
+
+ sys_fileno = sys.stdin.fileno()
+ old_settings = termios.tcgetattr(sys_fileno)
+ tty.setraw(sys_fileno)
+
+ poller.register(sys.stdin.fileno(), select.EPOLLIN)
+
+ EOF = False
+ while EOF is False:
+ for fileno, event in poller.poll(0.025):
+ char = sys.stdin.read(1)
+
+ #sys.stdout.write(f"{[char]}")
+ #sys.stdout.flush()
+
+ if (newline := (char in ('\n', '\r'))):
+ EOF = True
+
+ if not newline or strip_rowbreaks is False:
+ response += char
+
+ if self.deal_with_control_characters(char) is not True:
+ self.write_line(response[-1], clear_line=False)
+
+ termios.tcsetattr(sys_fileno, termios.TCSADRAIN, old_settings)
+
+ if end:
+ sys.stdout.write(end)
+ sys.stdout.flush()
+ self._cursor_x = 0
+ self._cursor_y += 1
+
+ if response:
+ return response
+
+def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
if not new_user and forced:
# TODO: make this text more generic?
# It's only used to create the first sudo user when root is disabled in guided.py
- log(' * Since root is disabled, you need to create a least one (super) user!', fg='red')
+ log(' * Since root is disabled, you need to create a least one superuser!', fg='red')
continue
elif not new_user and not forced:
raise UserError("No superuser was created.")
@@ -112,7 +282,7 @@ def ask_for_superuser_account(prompt='Create a required super-user with sudo pri
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
users = {}
- super_users = {}
+ superusers = {}
while 1:
new_user = input(prompt).strip(' ')
@@ -122,26 +292,37 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
continue
password = get_password(prompt=f'Password for user {new_user}: ')
- if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
- super_users[new_user] = {"!password" : password}
+ if input("Should this user be a superuser (sudoer) [y/N]: ").strip(' ').lower() in ('y', 'yes'):
+ superusers[new_user] = {"!password" : password}
else:
users[new_user] = {"!password" : password}
- return users, super_users
+ return users, superusers
def ask_for_a_timezone():
- timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip()
- if timezone == '':
- timezone = 'UTC'
- if (pathlib.Path("/usr")/"share"/"zoneinfo"/timezone).exists():
- return timezone
+ while True:
+ timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip().strip('*.')
+ if timezone == '':
+ timezone = 'UTC'
+ if (pathlib.Path("/usr")/"share"/"zoneinfo"/timezone).exists():
+ return timezone
+ else:
+ log(
+ f"Specified timezone {timezone} does not exist.",
+ level=logging.WARNING,
+ fg='red'
+ )
+
+def ask_for_bootloader() -> str:
+ bootloader = "systemd-bootctl"
+ if hasUEFI()==False:
+ bootloader="grub-install"
else:
- log(
- f"Time zone {timezone} does not exist, continuing with system default.",
- level=LOG_LEVELS.Warning,
- fg='red'
- )
-
+ bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower()
+ if bootloader_choice == "y":
+ bootloader="grub-install"
+ return bootloader
+
def ask_for_audio_selection():
audio = "pulseaudio" # Default for most desktop environments
pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
@@ -154,27 +335,56 @@ def ask_to_configure_network():
# Optionally configure one network interface.
#while 1:
# {MAC: Ifname}
- interfaces = {'ISO-CONFIG' : 'Copy ISO network configuration to installation','NetworkManager':'Use NetworkManager to control and manage your internet connection', **list_interfaces()}
+ interfaces = {
+ 'ISO-CONFIG' : 'Copy ISO network configuration to installation',
+ 'NetworkManager':'Use NetworkManager to control and manage your internet connection',
+ **list_interfaces()
+ }
- nic = generic_select(interfaces.values(), "Select one network interface to configure (leave blank to skip): ")
+ nic = generic_select(interfaces, "Select one network interface to configure (leave blank to skip): ")
if nic and nic != 'Copy ISO network configuration to installation':
if nic == 'Use NetworkManager to control and manage your internet connection':
return {'nic': nic,'NetworkManager':True}
- mode = generic_select(['DHCP (auto detect)', 'IP (static)'], f"Select which mode to configure for {nic}: ")
- if mode == 'IP (static)':
+
+ # Current workaround:
+ # For selecting modes without entering text within brackets,
+ # printing out this part separate from options, passed in
+ # `generic_select`
+ modes = ['DHCP (auto detect)', 'IP (static)']
+ for index, mode in enumerate(modes):
+ print(f"{index}: {mode}")
+
+ mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ",
+ options_output=False)
+ if mode == 'IP':
while 1:
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
- if ip:
+ # Implemented new check for correct IP/subnet input
+ try:
+ ipaddress.ip_interface(ip)
break
- else:
+ except ValueError:
log(
"You need to enter a valid IP in IP-config mode.",
- level=LOG_LEVELS.Warning,
+ level=logging.WARNING,
fg='red'
)
- if not len(gateway := input('Enter your gateway (router) IP address or leave blank for none: ').strip()):
- gateway = None
+ # Implemented new check for correct gateway IP address
+ while 1:
+ gateway = input('Enter your gateway (router) IP address or leave blank for none: ').strip()
+ try:
+ if len(gateway) == 0:
+ gateway = None
+ else:
+ ipaddress.ip_address(gateway)
+ break
+ except ValueError:
+ log(
+ "You need to enter a valid gateway (router) IP address.",
+ level=logging.WARNING,
+ fg='red'
+ )
dns = None
if len(dns_input := input('Enter your DNS servers (space separated, blank for none): ').strip()):
@@ -190,12 +400,13 @@ def ask_to_configure_network():
def ask_for_disk_layout():
options = {
- 'keep-existing' : 'Keep existing partition layout and select which ones to use where.',
- 'format-all' : 'Format entire drive and setup a basic partition scheme.',
- 'abort' : 'Abort the installation.'
+ 'keep-existing' : 'Keep existing partition layout and select which ones to use where',
+ 'format-all' : 'Format entire drive and setup a basic partition scheme',
+ 'abort' : 'Abort the installation'
}
- value = generic_select(options.values(), "Found partitions on the selected drive, (select by number) what you want to do: ")
+ value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ",
+ allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None)
def ask_for_main_filesystem_format():
@@ -206,40 +417,71 @@ def ask_for_main_filesystem_format():
'f2fs' : 'f2fs'
}
- value = generic_select(options.values(), "Select which filesystem your main partition should use (by number or name): ")
+ value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ",
+ allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None)
-def generic_select(options, input_text="Select one of the above by index or absolute value: ", sort=True):
+def generic_select(options, input_text="Select one of the above by index or absolute value: ", allow_empty_input=True, options_output=True, sort=False):
"""
A generic select function that does not output anything
other than the options and their indexes. As an example:
generic_select(["first", "second", "third option"])
- 1: first
- 2: second
- 3: third option
+ 0: first
+ 1: second
+ 2: third option
+
+ When the user has entered the option correctly,
+ this function returns an item from list, a string, or None
"""
- if type(options) == dict: options = list(options)
- if sort: options = sorted(list(options))
- if len(options) <= 0: raise RequirementError('generic_select() requires at least one option to operate.')
+ # Checking if options are different from `list` or `dict`
+ if type(options) not in [list, dict]:
+ log(f" * Generic select doesn't support ({type(options)}) as type of options * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError("generic_select() requires list or dictionary as options.")
+ # To allow only `list` and `dict`, converting values of options here.
+ # Therefore, now we can only provide the dictionary itself
+ if type(options) == dict: options = list(options.values())
+ if sort: options = sorted(options) # As we pass only list and dict (converted to list), we can skip converting to list
+ if len(options) == 0:
+ log(f" * Generic select didn't find any options to choose from * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError('generic_select() requires at least one option to proceed.')
+
- for index, option in enumerate(options):
- print(f"{index}: {option}")
+ # Added ability to disable the output of options items,
+ # if another function displays something different from this
+ if options_output:
+ for index, option in enumerate(options):
+ print(f"{index}: {option}")
+
+ # The new changes introduce a single while loop for all inputs processed by this function
+ # Now the try...except block handles validation for invalid input from the user
+ while True:
+ try:
+ selected_option = input(input_text)
+ if len(selected_option.strip()) == 0:
+ # `allow_empty_input` parameter handles return of None on empty input, if necessary
+ # Otherwise raise `RequirementError`
+ if allow_empty_input:
+ return None
+ raise RequirementError('Please select an option to continue')
+ # Replaced `isdigit` with` isnumeric` to discard all negative numbers
+ elif selected_option.isnumeric():
+ selected_option = int(selected_option)
+ if selected_option >= len(options):
+ raise RequirementError(f'Selected option "{selected_option}" is out of range')
+ selected_option = options[selected_option]
+ break
+ elif selected_option in options:
+ break # We gave a correct absolute value
+ else:
+ raise RequirementError(f'Selected option "{selected_option}" does not exist in available options')
+ except RequirementError as err:
+ log(f" * {err} * ", fg='red')
+ continue
- selected_option = input(input_text)
- if len(selected_option.strip()) <= 0:
- return None
- elif selected_option.isdigit():
- selected_option = int(selected_option)
- if selected_option > len(options):
- raise RequirementError(f'Selected option "{selected_option}" is out of range')
- selected_option = options[selected_option]
- elif selected_option in options:
- pass # We gave a correct absolute value
- else:
- raise RequirementError(f'Selected option "{selected_option}" does not exist in available options: {options}')
-
return selected_option
def select_disk(dict_o_disks):
@@ -257,18 +499,14 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
- drive = input('Select one of the above disks (by number or full path) or write /mnt to skip partitioning: ')
- if drive.strip() == '/mnt':
- return None
- elif drive.isdigit():
- drive = int(drive)
- if drive >= len(drives):
- raise DiskError(f'Selected option "{drive}" is out of range')
- drive = dict_o_disks[drives[drive]]
- elif drive in dict_o_disks:
- drive = dict_o_disks[drive]
- else:
- raise DiskError(f'Selected drive does not exist: "{drive}"')
+
+ log(f"You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
+ drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ',
+ options_output=False)
+ if not drive:
+ return drive
+
+ drive = dict_o_disks[drive]
return drive
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
@@ -293,29 +531,21 @@ def select_profile(options):
print(' -- The above list is a set of pre-programmed profiles. --')
print(' -- They might make it easier to install things like desktop environments. --')
print(' -- (Leave blank and hit enter to skip this step and continue) --')
- selected_profile = input('Enter a pre-programmed profile name if you want to install one: ')
-
- if len(selected_profile.strip()) <= 0:
- return None
-
- if selected_profile.isdigit() and (pos := int(selected_profile)) <= len(profiles)-1:
- selected_profile = profiles[pos]
- elif selected_profile in options:
- selected_profile = options[options.index(selected_profile)]
- else:
- RequirementError("Selected profile does not exist.")
- return Profile(None, selected_profile)
-
- raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
+ selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ',
+ options_output=False)
+ if selected_profile:
+ return Profile(None, selected_profile)
+ else:
+ raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
def select_language(options, show_only_country_codes=True):
"""
Asks the user to select a language from the `options` dictionary parameter.
Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
- :param options: A `dict` where keys are the language name, value should be a dict containing language information.
- :type options: dict
+ :param options: A `generator` or `list` where keys are the language name, value should be a dict containing language information.
+ :type options: generator or list
:param show_only_country_codes: Filters out languages that are not len(lang) == 2. This to limit the number of results from stuff like dvorak and x-latin1 alternatives.
:type show_only_country_codes: bool
@@ -334,35 +564,37 @@ def select_language(options, show_only_country_codes=True):
for index, language in enumerate(languages):
print(f"{index}: {language}")
- print(' -- You can enter ? or help to search for more languages, or skip to use US layout --')
- selected_language = input('Select one of the above keyboard languages (by number or full name): ')
-
- if len(selected_language.strip()) == 0:
- return DEFAULT_KEYBOARD_LANGUAGE
- elif selected_language.lower() in ('?', 'help'):
- while True:
- filter_string = input('Search for layout containing (example: "sv-"): ')
- new_options = list(search_keyboard_layout(filter_string))
-
- if len(new_options) <= 0:
- log(f"Search string '{filter_string}' yielded no results, please try another search or Ctrl+D to abort.", fg='yellow')
- continue
+ print(" -- You can choose a layout that isn't in this list, but whose name you know --")
+ print(" -- Also, you can enter '?' or 'help' to search for more languages, or skip to use US layout --")
- return select_language(new_options, show_only_country_codes=False)
+ while True:
+ selected_language = input('Select one of the above keyboard languages (by name or full name): ')
+ if not selected_language:
+ return DEFAULT_KEYBOARD_LANGUAGE
+ elif selected_language.lower() in ('?', 'help'):
+ while True:
+ filter_string = input("Search for layout containing (example: \"sv-\") or enter 'exit' to exit from search: ")
- elif selected_language.isdigit() and (pos := int(selected_language)) <= len(languages)-1:
- selected_language = languages[pos]
- return selected_language
- # I'm leaving "options" on purpose here.
- # Since languages possibly contains a filtered version of
- # all possible language layouts, and we might want to write
- # for instance sv-latin1 (if we know that exists) without having to
- # go through the search step.
- elif selected_language in options:
- selected_language = options[options.index(selected_language)]
- return selected_language
- else:
- raise RequirementError("Selected language does not exist.")
+ if filter_string.lower() == 'exit':
+ return select_language(list_keyboard_languages())
+
+ new_options = list(search_keyboard_layout(filter_string))
+
+ if len(new_options) <= 0:
+ log(f"Search string '{filter_string}' yielded no results, please try another search.", fg='yellow')
+ continue
+
+ return select_language(new_options, show_only_country_codes=False)
+ elif selected_language.isnumeric():
+ selected_language = int(selected_language)
+ if selected_language >= len(languages):
+ log(' * Selected option is out of range * ', fg='red')
+ continue
+ return languages[selected_language]
+ elif verify_keyboard_layout(selected_language):
+ return selected_language
+ else:
+ log(" * Given language wasn't found * ", fg='red')
raise RequirementError("Selecting languages require a least one language to be given as an option.")
@@ -389,23 +621,76 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --')
- selected_mirror = input('Select one of the above regions to download packages from (by number or full name): ')
- if len(selected_mirror.strip()) == 0:
+ selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ',
+ options_output=False)
+ if not selected_mirror:
# Returning back empty options which can be both used to
# do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining
return {}
- elif selected_mirror.isdigit() and int(selected_mirror) <= len(regions)-1:
- # I'm leaving "mirrors" on purpose here.
- # Since region possibly contains a known region of
- # all possible regions, and we might want to write
- # for instance Sweden (if we know that exists) without having to
- # go through the search step.
- region = regions[int(selected_mirror)]
- selected_mirrors[region] = mirrors[region]
- elif selected_mirror in mirrors:
- selected_mirrors[selected_mirror] = mirrors[selected_mirror]
- else:
- raise RequirementError("Selected region does not exist.")
+ # I'm leaving "mirrors" on purpose here.
+ # Since region possibly contains a known region of
+ # all possible regions, and we might want to write
+ # for instance Sweden (if we know that exists) without having to
+ # go through the search step.
+ selected_mirrors[selected_mirror] = mirrors[selected_mirror]
return selected_mirrors
+
+ raise RequirementError("Selecting mirror region require a least one region to be given as an option.")
+
+def select_driver(options=AVAILABLE_GFX_DRIVERS):
+ """
+ Some what convoluted function, which's job is simple.
+ Select a graphics driver from a pre-defined set of popular options.
+
+ (The template xorg is for beginner users, not advanced, and should
+ there for appeal to the general public first and edge cases later)
+ """
+
+ drivers = sorted(list(options))
+
+ if drivers:
+ lspci = sys_command(f'/usr/bin/lspci')
+ for line in lspci.trace_log.split(b'\r\n'):
+ if b' vga ' in line.lower():
+ if b'nvidia' in line.lower():
+ print(' ** nvidia card detected, suggested driver: nvidia **')
+ elif b'amd' in line.lower():
+ print(' ** AMD card detected, suggested driver: AMD / ATI **')
+
+ initial_option = generic_select(drivers, input_text="Select your graphics card driver: ")
+ selected_driver = options[initial_option]
+
+ if type(selected_driver) == dict:
+ driver_options = sorted(list(selected_driver))
+
+ driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ',
+ allow_empty_input=False)
+ driver_package_group = selected_driver[driver_package_group]
+
+ return driver_package_group
+
+ return selected_driver
+
+ raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
+
+def select_kernel(options):
+ """
+ Asks the user to select a kernel for system.
+
+ :param options: A `list` with kernel options
+ :type options: list
+
+ :return: The string as a selected kernel
+ :rtype: string
+ """
+
+ DEFAULT_KERNEL = "linux"
+
+ kernels = sorted(list(options))
+
+ if kernels:
+ return generic_multi_select(kernels, f"Choose which kernel to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL)
+
+ raise RequirementError("Selecting kernels require a least one kernel to be given as an option.")