Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds@gmail.com>2021-05-24 12:34:32 +0200
committerAnton Hvornum <anton.feeds@gmail.com>2021-05-24 12:34:32 +0200
commit9b0de26c67b6bb90b69d5b4ef874dc0bed407a77 (patch)
tree69f1b5e0b6ad48018ce9e18d6d74079bfb123540 /archinstall/lib
parent404197dc93c2efb24097772848af708d833bdd98 (diff)
parent7daaf1143fca2723bfb63ab4e3030485446da1c0 (diff)
Syncing in latest changes from master
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py233
-rw-r--r--archinstall/lib/exceptions.py20
-rw-r--r--archinstall/lib/general.py461
-rw-r--r--archinstall/lib/hardware.py125
-rw-r--r--archinstall/lib/installer.py339
-rw-r--r--archinstall/lib/locale_helpers.py46
-rw-r--r--archinstall/lib/luks.py43
-rw-r--r--archinstall/lib/mirrors.py37
-rw-r--r--archinstall/lib/networking.py51
-rw-r--r--archinstall/lib/output.py65
-rw-r--r--archinstall/lib/packages.py20
-rw-r--r--archinstall/lib/profiles.py57
-rw-r--r--archinstall/lib/services.py6
-rw-r--r--archinstall/lib/storage.py16
-rw-r--r--archinstall/lib/systemd.py87
-rw-r--r--archinstall/lib/tts.py0
-rw-r--r--archinstall/lib/user_interaction.py345
17 files changed, 1183 insertions, 768 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index 49bef1be..8f67111a 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,22 +1,26 @@
-import glob, re, os, json, time, hashlib
-import pathlib, traceback, logging
+import glob
+import pathlib
+import re
+import time
from collections import OrderedDict
-from .exceptions import DiskError
+from typing import Optional
+
from .general import *
+from .hardware import has_uefi
from .output import log
-from .storage import storage
-from .hardware import hasUEFI
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
MBR = 0b00000010
-#import ctypes
-#import ctypes.util
-#libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
-#libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
-class BlockDevice():
+# import ctypes
+# import ctypes.util
+# libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
+# libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
+
+
+class BlockDevice:
def __init__(self, path, info=None):
if not info:
# If we don't give any information, we need to auto-fill it.
@@ -50,9 +54,9 @@ class BlockDevice():
to give less/partial information for user readability.
"""
return {
- 'path' : self.path,
- 'size' : self.info['size'] if 'size' in self.info else '<unknown>',
- 'model' : self.info['model'] if 'model' in self.info else '<unknown>'
+ 'path': self.path,
+ 'size': self.info['size'] if 'size' in self.info else '<unknown>',
+ 'model': self.info['model'] if 'model' in self.info else '<unknown>'
}
def __dump__(self):
@@ -74,8 +78,9 @@ class BlockDevice():
raise DiskError(f'Could not locate backplane info for "{self.path}"')
if self.info['type'] == 'loop':
- for drive in json.loads(b''.join(sys_command(['losetup', '--json'], hide_from_log=True)).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path: continue
+ for drive in json.loads(b''.join(SysCommand(['losetup', '--json'])).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == self.path:
+ continue
return drive['back-file']
elif self.info['type'] == 'disk':
@@ -90,21 +95,21 @@ class BlockDevice():
else:
log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
- # if not stat.S_ISBLK(os.stat(full_path).st_mode):
- # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
+ # if not stat.S_ISBLK(os.stat(full_path).st_mode):
+ # raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
def partitions(self):
- o = b''.join(sys_command(['partprobe', self.path]))
+ o = b''.join(SysCommand(['partprobe', self.path]))
- #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
- o = b''.join(sys_command(['/usr/bin/lsblk', '-J', self.path]))
+ # o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
+ o = b''.join(SysCommand(['/usr/bin/lsblk', '-J', self.path]))
if b'not a block device' in o:
raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}')
if not o[:1] == b'{':
- raise DiskError(f'Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
+ raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}')
r = json.loads(o.decode('UTF-8'))
if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
@@ -112,7 +117,7 @@ class BlockDevice():
for part in r['blockdevices'][0]['children']:
part_id = part['name'][len(os.path.basename(self.path)):]
if part_id not in self.part_cache:
- ## TODO: Force over-write even if in cache?
+ # TODO: Force over-write even if in cache?
if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']:
self.part_cache[part_id] = Partition(root_path + part_id, self, part_id=part_id, size=part['size'])
@@ -129,13 +134,13 @@ class BlockDevice():
@property
def uuid(self):
- log(f'BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
+ log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
"""
- lsblk = b''.join(sys_command(f'lsblk -J -o+UUID {self.path}'))
+ lsblk = b''.join(SysCommand(f'lsblk -J -o+UUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('uuid', None)
@@ -151,8 +156,9 @@ class BlockDevice():
def flush_cache(self):
self.part_cache = OrderedDict()
-class Partition():
- def __init__(self, path :str, block_device :BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
+
+class Partition:
+ def __init__(self, path: str, block_device: BlockDevice, part_id=None, size=-1, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
if not part_id:
part_id = os.path.basename(path)
@@ -162,24 +168,24 @@ class Partition():
self.mountpoint = mountpoint
self.target_mountpoint = mountpoint
self.filesystem = filesystem
- self.size = size # TODO: Refresh?
+ self.size = size # TODO: Refresh?
self._encrypted = None
self.encrypted = encrypted
- self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
+ self.allow_formatting = False # A fail-safe for unconfigured partitions, such as windows NTFS partitions.
if mountpoint:
self.mount(mountpoint)
mount_information = get_mount_info(self.path)
-
+
if self.mountpoint != mount_information.get('target', None) and mountpoint:
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
- if (target := mount_information.get('target', None)):
+ if target := mount_information.get('target', None):
self.mountpoint = target
if not self.filesystem and autodetect_filesystem:
- if (fstype := mount_information.get('fstype', get_filesystem_type(path))):
+ if fstype := mount_information.get('fstype', get_filesystem_type(path)):
self.filesystem = fstype
if self.filesystem == 'crypto_LUKS':
@@ -190,7 +196,7 @@ class Partition():
left_comparitor = left_comparitor.path
else:
left_comparitor = str(left_comparitor)
- return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
+ return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
def __repr__(self, *args, **kwargs):
mount_repr = ''
@@ -205,22 +211,23 @@ class Partition():
return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
@property
- def uuid(self) -> str:
+ def uuid(self) -> Optional[str]:
"""
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
it doesn't seam to be able to detect md raid partitions.
"""
- lsblk = b''.join(sys_command(f'lsblk -J -o+PARTUUID {self.path}'))
+ lsblk = b''.join(SysCommand(f'lsblk -J -o+PARTUUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('partuuid', None)
+ return None
@property
def encrypted(self):
return self._encrypted
@encrypted.setter
- def encrypted(self, value :bool):
+ def encrypted(self, value: bool):
self._encrypted = value
@@ -230,10 +237,10 @@ class Partition():
@property
def real_device(self):
- for blockdevice in json.loads(b''.join(sys_command('lsblk -J')).decode('UTF-8'))['blockdevices']:
- if (parent := self.find_parent_of(blockdevice, os.path.basename(self.path))):
+ for blockdevice in json.loads(b''.join(SysCommand('lsblk -J')).decode('UTF-8'))['blockdevices']:
+ if parent := self.find_parent_of(blockdevice, os.path.basename(self.path)):
return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
+ # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
return self.path
def detect_inner_filesystem(self, password):
@@ -249,16 +256,18 @@ class Partition():
def has_content(self):
if not get_filesystem_type(self.path):
return False
-
- temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
+
+ temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
temporary_path = pathlib.Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
- if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
+ if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
-
+
files = len(glob.glob(f"{temporary_mountpoint}/*"))
- sys_command(f'/usr/bin/umount {temporary_mountpoint}')
+ iterations = 0
+ while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10:
+ time.sleep(1)
temporary_path.rmdir()
@@ -321,36 +330,36 @@ class Partition():
log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
if filesystem == 'btrfs':
- o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {path}'))
+ o = b''.join(SysCommand(f'/usr/bin/mkfs.btrfs -f {path}'))
if b'UUID' not in o:
raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
self.filesystem = 'btrfs'
elif filesystem == 'vfat':
- o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {path}'))
+ o = b''.join(SysCommand(f'/usr/bin/mkfs.vfat -F32 {path}'))
if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
raise DiskError(f'Could not format {path} with {filesystem} because: {o}')
self.filesystem = 'vfat'
elif filesystem == 'ext4':
- if (handle := sys_command(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
+ if (handle := SysCommand(f'/usr/bin/mkfs.ext4 -F {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'ext4'
elif filesystem == 'xfs':
- if (handle := sys_command(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
+ if (handle := SysCommand(f'/usr/bin/mkfs.xfs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'xfs'
elif filesystem == 'f2fs':
- if (handle := sys_command(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
+ if (handle := SysCommand(f'/usr/bin/mkfs.f2fs -f {path}')).exit_code != 0:
raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
self.filesystem = 'f2fs'
elif filesystem == 'crypto_LUKS':
- # from .luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
+ # from .luks import luks2
+ # encrypted_partition = luks2(self, None, None)
+ # encrypted_partition.format(path)
self.filesystem = 'crypto_LUKS'
else:
@@ -368,36 +377,40 @@ class Partition():
return parent
elif 'children' in data:
for child in data['children']:
- if (parent := self.find_parent_of(child, name, parent=data['name'])):
+ if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
def mount(self, target, fs=None, options=''):
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
+ if not self.filesystem:
+ raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
fs = self.filesystem
pathlib.Path(target).mkdir(parents=True, exist_ok=True)
try:
- sys_command(f'/usr/bin/mount {self.path} {target}')
+ if options:
+ SysCommand(f'/usr/bin/mount -o {options} {self.path} {target}')
+ else:
+ SysCommand(f'/usr/bin/mount {self.path} {target}')
except SysCallError as err:
raise err
-
+
self.mountpoint = target
return True
def unmount(self):
try:
- exit_code = sys_command(f'/usr/bin/umount {self.path}').exit_code
+ exit_code = SysCommand(f'/usr/bin/umount {self.path}').exit_code
except SysCallError as err:
exit_code = err.exit_code
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
- if exit_code > 0 and exit_code < 8000:
+ if 0 < exit_code < 8000:
raise err
self.mountpoint = None
@@ -410,22 +423,23 @@ class Partition():
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
- 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
- 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
+ 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
+ 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
- except SysCallError:
- pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
+ except (SysCallError, DiskError):
+ pass # We supported it, but /dev/null is not formatable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err:
raise err
return True
-class Filesystem():
+
+class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
- def __init__(self, blockdevice,mode):
+ def __init__(self, blockdevice, mode):
self.blockdevice = blockdevice
self.mode = mode
@@ -437,15 +451,15 @@ class Filesystem():
self.blockdevice.flush_cache()
return self
else:
- raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
+ raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt')
elif self.mode == MBR:
- if sys_command(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0:
+ if SysCommand(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0:
return self
else:
- raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
+ raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
else:
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
-
+
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
elif self.mode == self.blockdevice.partition_table_type:
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG)
@@ -461,7 +475,7 @@ class Filesystem():
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
- b''.join(sys_command(f'sync'))
+ b''.join(SysCommand('sync'))
return True
def find_partition(self, mountpoint):
@@ -469,11 +483,11 @@ class Filesystem():
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
- def raw_parted(self, string:str):
- x = sys_command(f'/usr/bin/parted -s {string}')
+ def raw_parted(self, string: str):
+ x = SysCommand(f'/usr/bin/parted -s {string}')
return x
- def parted(self, string:str):
+ def parted(self, string: str):
"""
Performs a parted execution of the given string
@@ -484,8 +498,8 @@ class Filesystem():
def use_entire_disk(self, root_filesystem_type='ext4'):
log(f"Using and formatting the entire {self.blockdevice}.", level=logging.DEBUG)
- if hasUEFI():
- self.add_partition('primary', start='1MiB', end='513MiB', format='fat32')
+ if has_uefi():
+ self.add_partition('primary', start='1MiB', end='513MiB', partition_format='fat32')
self.set_name(0, 'EFI')
self.set(0, 'boot on')
# TODO: Probably redundant because in GPT mode 'esp on' is an alias for "boot on"?
@@ -503,39 +517,40 @@ class Filesystem():
self.blockdevice.partition[0].allow_formatting = True
self.blockdevice.partition[1].allow_formatting = True
else:
- #we don't need a seprate boot partition it would be a waste of space
+ # we don't need a seprate boot partition it would be a waste of space
self.add_partition('primary', start='1MB', end='100%')
- self.blockdevice.partition[0].filesystem=root_filesystem_type
+ self.blockdevice.partition[0].filesystem = root_filesystem_type
log(f"Set the root partition {self.blockdevice.partition[0]} to use filesystem {root_filesystem_type}.", level=logging.DEBUG)
self.blockdevice.partition[0].target_mountpoint = '/'
self.blockdevice.partition[0].allow_formatting = True
- def add_partition(self, type, start, end, format=None):
+ def add_partition(self, partition_type, start, end, partition_format=None):
log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
-
+
previous_partitions = self.blockdevice.partitions
if self.mode == MBR:
- if len(self.blockdevice.partitions)>3:
+ if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 parimary partitions")
- if format:
- partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {format} {start} {end}') == 0
+ if partition_format:
+ partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}') == 0
else:
- partitioning = self.parted(f'{self.blockdevice.device} mkpart {type} {start} {end}') == 0
+ partitioning = self.parted(f'{self.blockdevice.device} mkpart {partition_type} {start} {end}') == 0
if partitioning:
start_wait = time.time()
while previous_partitions == self.blockdevice.partitions:
- time.sleep(0.025) # Let the new partition come up in the kernel
+ time.sleep(0.025) # Let the new partition come up in the kernel
if time.time() - start_wait > 10:
raise DiskError(f"New partition never showed up after adding new partition on {self} (timeout 10 seconds).")
return True
- def set_name(self, partition:int, name:str):
- return self.parted(f'{self.blockdevice.device} name {partition+1} "{name}"') == 0
+ def set_name(self, partition: int, name: str):
+ return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
+
+ def set(self, partition: int, string: str):
+ return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
- def set(self, partition:int, string:str):
- return self.parted(f'{self.blockdevice.device} set {partition+1} {string}') == 0
def device_state(name, *args, **kwargs):
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
@@ -554,28 +569,32 @@ def device_state(name, *args, **kwargs):
return
return True
+
# lsblk --json -l -n -o path
def all_disks(*args, **kwargs):
kwargs.setdefault("partitions", False)
drives = OrderedDict()
- #for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
- for drive in json.loads(b''.join(sys_command(f'lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part': continue
+ # for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
+ for drive in json.loads(b''.join(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model')).decode('UTF_8'))['blockdevices']:
+ if not kwargs['partitions'] and drive['type'] == 'part':
+ continue
drives[drive['path']] = BlockDevice(drive['path'], drive)
return drives
+
def convert_to_gigabytes(string):
unit = string.strip()[-1]
size = float(string.strip()[:-1])
if unit == 'M':
- size = size/1024
+ size = size / 1024
elif unit == 'T':
- size = size*1024
+ size = size * 1024
return size
+
def harddrive(size=None, model=None, fuzzy=False):
collection = all_disks()
for drive in collection:
@@ -586,13 +605,18 @@ def harddrive(size=None, model=None, fuzzy=False):
return collection[drive]
-def get_mount_info(path):
+
+def get_mount_info(path) -> dict:
try:
- output = b''.join(sys_command(f'/usr/bin/findmnt --json {path}'))
+ output = SysCommand(f'/usr/bin/findmnt --json {path}')
except SysCallError:
return {}
output = output.decode('UTF-8')
+
+ if not output:
+ return {}
+
output = json.loads(output)
if 'filesystems' in output:
if len(output['filesystems']) > 1:
@@ -600,15 +624,20 @@ def get_mount_info(path):
return output['filesystems'][0]
-def get_partitions_in_use(mountpoint):
+
+def get_partitions_in_use(mountpoint) -> list:
try:
- output = b''.join(sys_command(f'/usr/bin/findmnt --json -R {mountpoint}'))
+ output = SysCommand(f'/usr/bin/findmnt --json -R {mountpoint}')
except SysCallError:
- return {}
+ return []
mounts = []
output = output.decode('UTF-8')
+
+ if not output:
+ return []
+
output = json.loads(output)
for target in output.get('filesystems', []):
mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']))
@@ -618,9 +647,19 @@ def get_partitions_in_use(mountpoint):
return mounts
+
def get_filesystem_type(path):
try:
- handle = sys_command(f"blkid -o value -s TYPE {path}")
+ handle = SysCommand(f"blkid -o value -s TYPE {path}")
return b''.join(handle).strip().decode('UTF-8')
except SysCallError:
return None
+
+
+def disk_layouts():
+ try:
+ handle = SysCommand("lsblk -f -o+TYPE,SIZE -J")
+ return json.loads(b''.join(handle).decode('UTF-8'))
+ except SysCallError as err:
+ log(f"Could not return disk layouts: {err}")
+ return None
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index 49913980..147b239b 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -1,23 +1,37 @@
class RequirementError(BaseException):
pass
+
+
class DiskError(BaseException):
pass
+
+
class UnknownFilesystemFormat(BaseException):
pass
+
+
class ProfileError(BaseException):
pass
+
+
class SysCallError(BaseException):
def __init__(self, message, exit_code):
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code
+
+
class ProfileNotFound(BaseException):
pass
+
+
class HardwareIncompatibilityError(BaseException):
pass
-class PermissionError(BaseException):
- pass
+
+
class UserError(BaseException):
pass
+
+
class ServiceException(BaseException):
- pass \ No newline at end of file
+ pass
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index eb0c5d14..3b62c891 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,16 +1,26 @@
-import os, json, hashlib, shlex, sys
-import time, pty, logging
+import hashlib
+import json
+import logging
+import os
+import pty
+import shlex
+import subprocess
+import sys
+import time
from datetime import datetime, date
-from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
+from typing import Union
+
from .exceptions import *
from .output import log
+
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
+
def multisplit(s, splitters):
- s = [s,]
+ s = [s, ]
for key in splitters:
ns = []
for obj in s:
@@ -18,38 +28,41 @@ def multisplit(s, splitters):
for index, part in enumerate(x):
if len(part):
ns.append(part)
- if index < len(x)-1:
+ if index < len(x) - 1:
ns.append(key)
s = ns
return s
+
def locate_binary(name):
for PATH in os.environ['PATH'].split(':'):
for root, folders, files in os.walk(PATH):
for file in files:
if file == name:
return os.path.join(root, file)
- break # Don't recurse
+ break # Don't recurse
+
+ raise RequirementError(f"Binary {name} does not exist.")
+
-class JSON_Encoder:
+class JsonEncoder:
def _encode(obj):
if isinstance(obj, dict):
- ## We'll need to iterate not just the value that default() usually gets passed
- ## But also iterate manually over each key: value pair in order to trap the keys.
-
+ # We'll need to iterate not just the value that default() usually gets passed
+ # But also iterate manually over each key: value pair in order to trap the keys.
+
copy = {}
for key, val in list(obj.items()):
if isinstance(val, dict):
- val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
- # But it's the only quick way I can think of to
- # trigger a encoding of sub-dictionaries.
+ # This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries.
+ val = json.loads(json.dumps(val, cls=JSON))
else:
- val = JSON_Encoder._encode(val)
-
+ val = JsonEncoder._encode(val)
+
if type(key) == str and key[0] == '!':
- copy[JSON_Encoder._encode(key)] = '******'
+ copy[JsonEncoder._encode(key)] = '******'
else:
- copy[JSON_Encoder._encode(key)] = val
+ copy[JsonEncoder._encode(key)] = val
return copy
elif hasattr(obj, 'json'):
return obj.json()
@@ -65,113 +78,134 @@ class JSON_Encoder:
else:
return obj
+
class JSON(json.JSONEncoder, json.JSONDecoder):
def _encode(self, obj):
- return JSON_Encoder._encode(obj)
+ return JsonEncoder._encode(obj)
def encode(self, obj):
return super(JSON, self).encode(self._encode(obj))
-class sys_command():#Thread):
- """
- Stolen from archinstall_gui
- """
- def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars={}, *args, **kwargs):
- kwargs.setdefault("worker_id", gen_uid())
- kwargs.setdefault("emulate", False)
- kwargs.setdefault("suppress_errors", False)
- self.log = kwargs.get('log', log)
+class SysCommandWorker:
+ def __init__(self, cmd, callbacks=None, peak_output=False, environment_vars=None, logfile=None, working_directory='./'):
+ if not callbacks:
+ callbacks = {}
+ if not environment_vars:
+ environment_vars = {}
- if kwargs['emulate']:
- self.log(f"Starting command '{cmd}' in emulation mode.", level=logging.DEBUG)
+ if type(cmd) is str:
+ cmd = shlex.split(cmd)
- if type(cmd) is list:
- # if we get a list of arguments
- self.raw_cmd = shlex.join(cmd)
- self.cmd = cmd
- else:
- # else consider it a single shell string
- # this should only be used if really necessary
- self.raw_cmd = cmd
- try:
- self.cmd = shlex.split(cmd)
- except Exception as e:
- raise ValueError(f'Incorrect string to split: {cmd}\n{e}')
+ if cmd[0][0] != '/' and cmd[0][:2] != './':
+ # "which" doesn't work as it's a builtin to bash.
+ # It used to work, but for whatever reason it doesn't anymore.
+ # We there for fall back on manual lookup in os.PATH
+ cmd[0] = locate_binary(cmd[0])
- self.args = args
- self.kwargs = kwargs
+ self.cmd = cmd
+ self.callbacks = callbacks
self.peak_output = peak_output
self.environment_vars = environment_vars
+ self.logfile = logfile
+ self.working_directory = working_directory
- self.kwargs.setdefault("worker", None)
- self.callback = callback
- self.pid = None
self.exit_code = None
- self.started = time.time()
+ self._trace_log = b''
+ self._trace_log_pos = 0
+ self.poll_object = epoll()
+ self.child_fd = None
+ self.started = None
self.ended = None
- self.worker_id = kwargs['worker_id']
- self.trace_log = b''
- self.status = 'starting'
- user_catalogue = os.path.expanduser('~')
+ def __contains__(self, key: bytes):
+ """
+ Contains will also move the current buffert position forward.
+ This is to avoid re-checking the same data when looking for output.
+ """
+ assert type(key) == bytes
- if (workdir := kwargs.get('workdir', None)):
- self.cwd = workdir
- self.exec_dir = workdir
- else:
- self.cwd = f"{user_catalogue}/.cache/archinstall/workers/{kwargs['worker_id']}/"
- self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
+ if (contains := key in self._trace_log[self._trace_log_pos:]):
+ self._trace_log_pos += self._trace_log[self._trace_log_pos:].find(key) + len(key)
- if not self.cmd[0][0] == '/':
- # "which" doesn't work as it's a builtin to bash.
- # It used to work, but for whatever reason it doesn't anymore. So back to square one..
+ return contains
- #self.log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
- #self.log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
- self.cmd[0] = locate_binary(self.cmd[0])
+ def __iter__(self, *args, **kwargs):
+ for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'):
+ if line:
+ yield line + b'\n'
- if not os.path.isdir(self.exec_dir):
- os.makedirs(self.exec_dir)
+ self._trace_log_pos = self._trace_log.rfind(b'\n')
- if start_callback:
- start_callback(self, *args, **kwargs)
- self.run()
+ def __repr__(self):
+ self.make_sure_we_are_executing()
+ return str(self._trace_log)
- def __iter__(self, *args, **kwargs):
- for line in self.trace_log.split(b'\n'):
- yield line
+ def __enter__(self):
+ return self
- def __repr__(self, *args, **kwargs):
- return f"{self.cmd, self.trace_log}"
+ def __exit__(self, *args):
+ # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- def decode(self, fmt='UTF-8'):
- return self.trace_log.decode(fmt)
+ if self.child_fd:
+ try:
+ os.close(self.child_fd)
+ except:
+ pass
- def dump(self):
- return {
- 'status': self.status,
- 'worker_id': self.worker_id,
- 'worker_result': self.trace_log.decode('UTF-8'),
- 'started': self.started,
- 'ended': self.ended,
- 'started_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
- 'ended_pprint': '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
- 'exit_code': self.exit_code
- }
+ if self.peak_output:
+ # To make sure any peaked output didn't leave us hanging
+ # on the same line we were on.
+ sys.stdout.write("\n")
+ sys.stdout.flush()
- def peak(self, output :str):
- if type(output) == bytes:
- try:
- output = output.decode('UTF-8')
- except UnicodeDecodeError:
- return None
+ if len(args) >= 2 and args[1]:
+ log(args[1], level=logging.ERROR, fg='red')
+
+ if self.exit_code != 0:
+ raise SysCallError(f"{self.cmd} exited with abnormal exit code: {self.exit_code}")
+
+ def is_alive(self):
+ self.poll()
+
+ if self.started and self.ended is None:
+ return True
+
+ return False
+
+ def write(self, data: bytes, line_ending=True):
+ assert type(data) == bytes # TODO: Maybe we can support str as well and encode it
+
+ self.make_sure_we_are_executing()
- output = output.strip('\r\n ')
- if len(output) <= 0:
- return None
+ os.write(self.child_fd, data + (b'\n' if line_ending else b''))
+ def make_sure_we_are_executing(self):
+ if not self.started:
+ return self.execute()
+
+ def tell(self) -> int:
+ self.make_sure_we_are_executing()
+ return self._trace_log_pos
+
+ def seek(self, pos):
+ self.make_sure_we_are_executing()
+ # Safety check to ensure 0 < pos < len(tracelog)
+ self._trace_log_pos = min(max(0, pos), len(self._trace_log))
+
+ def peak(self, output: Union[str, bytes]) -> bool:
if self.peak_output:
+ if type(output) == bytes:
+ try:
+ output = output.decode('UTF-8')
+ except UnicodeDecodeError:
+ return False
+
+ output = output.strip('\r\n ')
+ if len(output) <= 0:
+ return False
+
from .user_interaction import get_terminal_width
# Move back to the beginning of the terminal
@@ -191,124 +225,133 @@ class sys_command():#Thread):
# And print the new output we're peaking on:
sys.stdout.write(output)
sys.stdout.flush()
+ return True
- def run(self):
- self.status = 'running'
- old_dir = os.getcwd()
- os.chdir(self.exec_dir)
- self.pid, child_fd = pty.fork()
- if not self.pid: # Child process
- # Replace child process with our main process
- if not self.kwargs['emulate']:
- try:
- os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
- except FileNotFoundError:
- self.status = 'done'
- self.log(f"{self.cmd[0]} does not exist.", level=logging.DEBUG)
- self.exit_code = 1
- return False
-
- os.chdir(old_dir)
-
- poller = epoll()
- poller.register(child_fd, EPOLLIN | EPOLLHUP)
-
- if 'events' in self.kwargs and 'debug' in self.kwargs:
- self.log(f'[D] Using triggers for command: {self.cmd}', level=logging.DEBUG)
- self.log(json.dumps(self.kwargs['events']), level=logging.DEBUG)
+ def poll(self):
+ self.make_sure_we_are_executing()
- alive = True
- last_trigger_pos = 0
- while alive and not self.kwargs['emulate']:
- for fileno, event in poller.poll(0.1):
- try:
- output = os.read(child_fd, 8192)
- self.peak(output)
- self.trace_log += output
- except OSError:
- alive = False
- break
-
- if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
- self.log(self.cmd, 'gave:', output.decode('UTF-8'), level=logging.DEBUG)
-
- if 'on_output' in self.kwargs:
- self.kwargs['on_output'](self.kwargs['worker'], output)
-
- lower = output.lower()
- broke = False
- if 'events' in self.kwargs:
- for trigger in list(self.kwargs['events']):
- if type(trigger) != bytes:
- original = trigger
- trigger = bytes(original, 'UTF-8')
- self.kwargs['events'][trigger] = self.kwargs['events'][original]
- del(self.kwargs['events'][original])
- if type(self.kwargs['events'][trigger]) != bytes:
- self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
-
- if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
- trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
-
- if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
- self.log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", level=logging.DEBUG)
-
- last_trigger_pos = trigger_pos
- os.write(child_fd, self.kwargs['events'][trigger])
- del(self.kwargs['events'][trigger])
- broke = True
- break
-
- if broke:
- continue
-
- ## Adding a exit trigger:
- if len(self.kwargs['events']) == 0:
- if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG)
-
- if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
- if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG)
- alive = False
- break
-
- self.status = 'done'
-
- if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} waiting for exit code.", level=logging.DEBUG)
-
- if not self.kwargs['emulate']:
+ got_output = False
+ for fileno, event in self.poll_object.poll(0.1):
+ try:
+ output = os.read(self.child_fd, 8192)
+ got_output = True
+ self.peak(output)
+ self._trace_log += output
+ except OSError as err:
+ self.ended = time.time()
+ break
+
+ if self.ended or (got_output is False and pid_exists(self.pid) is False):
+ self.ended = time.time()
try:
self.exit_code = os.waitpid(self.pid, 0)[1]
except ChildProcessError:
try:
- self.exit_code = os.waitpid(child_fd, 0)[1]
+ self.exit_code = os.waitpid(self.child_fd, 0)[1]
except ChildProcessError:
self.exit_code = 1
- else:
- self.exit_code = 0
- if 'debug' in self.kwargs and self.kwargs['debug']:
- self.log(f"{self.cmd[0]} got exit code: {self.exit_code}", level=logging.DEBUG)
+ def execute(self) -> bool:
+ if (old_dir := os.getcwd()) != self.working_directory:
+ os.chdir(self.working_directory)
+
+ # Note: If for any reason, we get a Python exception between here
+ # and until os.close(), the traceback will get locked inside
+ # stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the
+ # only way to get the traceback without loosing it.
+ self.pid, self.child_fd = pty.fork()
+ os.chdir(old_dir)
+
+ if not self.pid:
+ try:
+ os.execve(self.cmd[0], self.cmd, {**os.environ, **self.environment_vars})
+ except FileNotFoundError:
+ log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red")
+ self.exit_code = 1
+ return False
+
+ self.started = time.time()
+ self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP)
+
+ return True
+
+ def decode(self, encoding='UTF-8'):
+ return self._trace_log.decode(encoding)
+
+
+class SysCommand:
+ def __init__(self, cmd, callback=None, start_callback=None, peak_output=False, environment_vars=None, working_directory='./'):
+ _callbacks = {}
+ if callback:
+ _callbacks['on_end'] = callback
+ if start_callback:
+ _callbacks['on_start'] = start_callback
+
+ self.cmd = cmd
+ self._callbacks = _callbacks
+ self.peak_output = peak_output
+ self.environment_vars = environment_vars
+ self.working_directory = working_directory
+
+ self.session = None
+ self.create_session()
+
+ def __enter__(self):
+ return self.session
- if 'ignore_errors' in self.kwargs:
- self.exit_code = 0
+ def __exit__(self, *args, **kwargs):
+ # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- if self.exit_code != 0 and not self.kwargs['suppress_errors']:
- #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
- #self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR)
- raise SysCallError(message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", exit_code=self.exit_code)
+ if len(args) >= 2 and args[1]:
+ log(args[1], level=logging.ERROR, fg='red')
- self.ended = time.time()
- with open(f'{self.cwd}/trace.log', 'wb') as fh:
- fh.write(self.trace_log)
+ def __iter__(self, *args, **kwargs):
+
+ for line in self.session:
+ yield line
+
+ def __repr__(self, *args, **kwargs):
+ return self.session._trace_log.decode('UTF-8')
+
+ def __json__(self):
+ return {
+ 'cmd': self.cmd,
+ 'callbacks': self._callbacks,
+ 'peak': self.peak_output,
+ 'environment_vars': self.environment_vars,
+ 'session': True if self.session else False
+ }
+
+ def create_session(self):
+ if self.session:
+ return True
try:
- os.close(child_fd)
- except:
- pass
+ self.session = SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars)
+
+ while self.session.ended is None:
+ self.session.poll()
+
+ if self.peak_output:
+ sys.stdout.write('\n')
+ sys.stdout.flush()
+
+ except SysCallError:
+ return False
+
+ return True
+
+ def decode(self, fmt='UTF-8'):
+ return self.session._trace_log.decode(fmt)
+
+ @property
+ def exit_code(self):
+ return self.session.exit_code
+
+ @property
+ def trace_log(self):
+ return self.session._trace_log
def prerequisite_check():
@@ -317,5 +360,23 @@ def prerequisite_check():
return True
+
def reboot():
- o = b''.join(sys_command("/usr/bin/reboot"))
+ o = b''.join(SysCommand("/usr/bin/reboot"))
+
+
+def pid_exists(pid: int):
+ try:
+ return any(subprocess.check_output(['/usr/bin/ps', '--no-headers', '-o', 'pid', '-p', str(pid)]).strip())
+ except subprocess.CalledProcessError:
+ return False
+
+
+def run_custom_user_commands(commands, installation):
+ for index, command in enumerate(commands):
+ log(f'Executing custom command "{command}" ...', fg='yellow')
+ with open(f"{installation.target}/var/tmp/user-command.{index}.sh", "w") as temp_script:
+ temp_script.write(command)
+ execution_output = SysCommand(f"arch-chroot {installation.target} bash /var/tmp/user-command.{index}.sh")
+ log(execution_output)
+ os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh")
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index e9c63e41..7c164096 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -1,73 +1,120 @@
-import os, subprocess, json
-from .general import sys_command
-from .networking import list_interfaces, enrichIfaceTypes
+import json
+import os
+import subprocess
from typing import Optional
-__packages__ = ['xf86-video-amdgpu', 'xf86-video-ati', 'xf86-video-intel', 'xf86-video-nouveau', 'xf86-video-fbdev', 'xf86-video-vesa', 'xf86-video-vmware', 'nvidia', 'mesa']
+from .general import SysCommand
+from .networking import list_interfaces, enrich_iface_types
+
+__packages__ = [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "xf86-video-nouveau",
+ "xf86-video-vmware",
+ "libva-mesa-driver",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-radeon",
+ "vulkan-intel",
+ "nvidia",
+]
AVAILABLE_GFX_DRIVERS = {
# Sub-dicts are layer-2 options to be selected
# and lists are a list of packages to be installed
- 'AMD / ATI' : {
- 'amd' : ['xf86-video-amdgpu'],
- 'ati' : ['xf86-video-ati']
- },
- 'intel' : ['xf86-video-intel'],
- 'nvidia' : {
- 'open-source' : ['xf86-video-nouveau'],
- 'proprietary' : ['nvidia']
+ "All open-source (default)": [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "xf86-video-nouveau",
+ "xf86-video-vmware",
+ "libva-mesa-driver",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-radeon",
+ "vulkan-intel",
+ ],
+ "AMD / ATI (open-source)": [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "libva-mesa-driver",
+ "vulkan-radeon",
+ ],
+ "Intel (open-source)": [
+ "mesa",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-intel",
+ ],
+ "Nvidia": {
+ "open-source": ["mesa", "xf86-video-nouveau", "libva-mesa-driver"],
+ "proprietary": ["nvidia"],
},
- 'mesa' : ['mesa'],
- 'fbdev' : ['xf86-video-fbdev'],
- 'vesa' : ['xf86-video-vesa'],
- 'vmware / virtualbox' : ['xf86-video-vmware']
+ "VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"],
}
-def hasWifi()->bool:
- return 'WIRELESS' in enrichIfaceTypes(list_interfaces().values()).values()
-def hasAMDCPU()->bool:
+def has_wifi() -> bool:
+ return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values()
+
+
+def has_amd_cpu() -> bool:
if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():
return True
return False
-def hasIntelCPU()->bool:
+
+
+def has_intel_cpu() -> bool:
if subprocess.check_output("lscpu | grep Intel", shell=True).strip().decode():
return True
return False
-def hasUEFI()->bool:
+
+def has_uefi() -> bool:
return os.path.isdir('/sys/firmware/efi')
-def graphicsDevices()->dict:
+
+def graphics_devices() -> dict:
cards = {}
- for line in sys_command(f"lspci"):
+ for line in SysCommand("lspci"):
if b' VGA ' in line:
- _, identifier = line.split(b': ',1)
+ _, identifier = line.split(b': ', 1)
cards[identifier.strip().lower().decode('UTF-8')] = line
return cards
-def hasNvidiaGraphics()->bool:
- return any('nvidia' in x for x in graphicsDevices())
-def hasAmdGraphics()->bool:
- return any('amd' in x for x in graphicsDevices())
+def has_nvidia_graphics() -> bool:
+ return any('nvidia' in x for x in graphics_devices())
-def hasIntelGraphics()->bool:
- return any('intel' in x for x in graphicsDevices())
+def has_amd_graphics() -> bool:
+ return any('amd' in x for x in graphics_devices())
+
+
+def has_intel_graphics() -> bool:
+ return any('intel' in x for x in graphics_devices())
+
+
+def cpu_vendor() -> Optional[str]:
+ cpu_info_raw = SysCommand("lscpu -J")
+ cpu_info = json.loads(b"".join(cpu_info_raw).decode('UTF-8'))['lscpu']
-def cpuVendor()-> Optional[str]:
- cpu_info = json.loads(subprocess.check_output("lscpu -J", shell=True).decode('utf-8'))['lscpu']
for info in cpu_info:
- if info.get('field',None):
- if info.get('field',None) == "Vendor ID:":
- return info.get('data',None)
+ if info.get('field', None) == "Vendor ID:":
+ return info.get('data', None)
+ return None
-def isVM() -> bool:
+
+def is_vm() -> bool:
try:
- subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a none 0 exit code if it is not on a virtual machine
- return True
+ # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine
+ if b"".join(SysCommand("systemd-detect-virt")).lower() != b"none":
+ return True
except:
- return False
+ pass
+
+ return False
# TODO: Add more identifiers
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 51539d2c..dff05b0a 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,54 +1,54 @@
-import os, stat, time, shutil, pathlib
-import subprocess, logging
-from .exceptions import *
from .disk import *
-from .general import *
-from .user_interaction import *
-from .profiles import Profile
+from .hardware import *
+from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
from .mirrors import *
-from .systemd import Networkd
-from .output import log
from .storage import storage
-from .hardware import *
from .plugins import plugins
+from .user_interaction import *
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
-class Installer():
+
+class Installer:
"""
`Installer()` is the wrapper for most basic installation steps.
It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things.
:param partition: Requires a partition as the first argument, this is
- so that the installer can mount to `mountpoint` and strap packages there.
+ so that the installer can mount to `mountpoint` and strap packages there.
:type partition: class:`archinstall.Partition`
:param boot_partition: There's two reasons for needing a boot partition argument,
- The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place
- during the `pacstrap` or `linux` and the base packages for a minimal installation.
- The second being when :py:func:`~archinstall.Installer.add_bootloader` is called,
- A `boot_partition` must be known to the installer before this is called.
+ The first being so that `mkinitcpio` can place the `vmlinuz` kernel at the right place
+ during the `pacstrap` or `linux` and the base packages for a minimal installation.
+ The second being when :py:func:`~archinstall.Installer.add_bootloader` is called,
+ A `boot_partition` must be known to the installer before this is called.
:type boot_partition: class:`archinstall.Partition`
:param profile: A profile to install, this is optional and can be called later manually.
- This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on.
+ This just simplifies the process by not having to call :py:func:`~archinstall.Installer.install_profile` later on.
:type profile: str, optional
:param hostname: The given /etc/hostname for the machine.
:type hostname: str, optional
"""
- def __init__(self, target, *, base_packages=__packages__[:3], kernels=['linux']):
+
+ def __init__(self, target, *, base_packages=None, kernels=None):
+ if base_packages is None:
+ base_packages = __packages__[:3]
+ if kernels is None:
+ kernels = ['linux']
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
self.helper_flags = {
- 'base' : False,
- 'bootloader' : False
+ 'base': False,
+ 'bootloader': False
}
-
+
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in kernels:
self.base_packages.append(kernel)
@@ -58,6 +58,12 @@ class Installer():
storage['session'] = self
self.partitions = get_partitions_in_use(self.target)
+ self.MODULES = []
+ self.BINARIES = []
+ self.FILES = []
+ self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
+ self.KERNEL_PARAMS = []
+
def log(self, *args, level=logging.DEBUG, **kwargs):
"""
installer.log() wraps output.log() mainly to set a default log-level for this install session.
@@ -69,11 +75,10 @@ class Installer():
return self
def __exit__(self, *args, **kwargs):
- # b''.join(sys_command(f'sync')) # No need to, since the underlying fs() object will call sync.
+ # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
- #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
self.log(args[1], level=logging.ERROR, fg='red')
self.sync_log_to_install_medium()
@@ -81,7 +86,7 @@ class Installer():
# We avoid printing /mnt/<log path> because that might confuse people if they note it down
# and then reboot, and a identical log file will be found in the ISO medium anyway.
print(f"[!] A log file has been created here: {os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])}")
- print(f" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")
+ print(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")
raise args[1]
self.genfstab()
@@ -95,10 +100,10 @@ class Installer():
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
for step in missing_steps:
self.log(f' - {step}', fg='red', level=logging.WARNING)
-
+
self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
- self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
-
+ self.log("Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
+
self.sync_log_to_install_medium()
return False
@@ -106,12 +111,12 @@ class Installer():
# Copy over the install log (if there is one) to the install medium if
# at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to.
if self.helper_flags.get('base-strapped', False) is True:
- if (filename := storage.get('LOG_FILE', None)):
+ if filename := storage.get('LOG_FILE', None):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
-
+
shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
return True
@@ -119,14 +124,15 @@ class Installer():
def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
-
+
partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
return [step for step, flag in self.helper_flags.items() if flag is False]
def pacstrap(self, *packages, **kwargs):
- if type(packages[0]) in (list, tuple): packages = packages[0]
+ if type(packages[0]) in (list, tuple):
+ packages = packages[0]
for plugin in plugins.values():
if hasattr(plugin, 'on_pacstrap'):
@@ -135,8 +141,8 @@ class Installer():
self.log(f'Installing packages: {packages}', level=logging.INFO)
- if (sync_mirrors := sys_command('/usr/bin/pacman -Syy')).exit_code == 0:
- if (pacstrap := sys_command(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', **kwargs)).exit_code == 0:
+ if (sync_mirrors := SysCommand('/usr/bin/pacman -Syy')).exit_code == 0:
+ if (pacstrap := SysCommand(f'/usr/bin/pacstrap {self.target} {" ".join(packages)}', peak_output=True)).exit_code == 0:
return True
else:
self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.INFO)
@@ -148,37 +154,39 @@ class Installer():
def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
-
- fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
- with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
- fstab_fh.write(fstab)
+
+ with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
+ fstab_fh.write(SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode())
if not os.path.isfile(f'{self.target}/etc/fstab'):
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n{fstab}')
return True
- def set_hostname(self, hostname :str, *args, **kwargs):
+ def set_hostname(self, hostname: str, *args, **kwargs):
with open(f'{self.target}/etc/hostname', 'w') as fh:
fh.write(hostname + '\n')
def set_locale(self, locale, encoding='UTF-8', *args, **kwargs):
- if not len(locale): return True
+ if not len(locale):
+ return True
with open(f'{self.target}/etc/locale.gen', 'a') as fh:
fh.write(f'{locale}.{encoding} {encoding}\n')
with open(f'{self.target}/etc/locale.conf', 'w') as fh:
fh.write(f'LANG={locale}.{encoding}\n')
- return True if sys_command(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
+ return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False
def set_timezone(self, zone, *args, **kwargs):
- if not zone: return True
- if not len(zone): return True # Redundant
+ if not zone:
+ return True
+ if not len(zone):
+ return True # Redundant
- if (pathlib.Path("/usr")/"share"/"zoneinfo"/zone).exists():
- (pathlib.Path(self.target)/"etc"/"localtime").unlink(missing_ok=True)
- sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
+ if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists():
+ (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True)
+ SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime')
return True
else:
self.log(
@@ -188,7 +196,7 @@ class Installer():
)
def activate_ntp(self):
- self.log(f'Installing and activating NTP.', level=logging.INFO)
+ self.log('Installing and activating NTP.', level=logging.INFO)
if self.pacstrap('ntp'):
if self.enable_service('ntpd'):
return True
@@ -200,18 +208,20 @@ class Installer():
raise ServiceException(f"Unable to start service {service}: {output}")
def run_command(self, cmd, *args, **kwargs):
- return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}')
+ return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs):
if 'runas' in kwargs:
cmd = f"su - {kwargs['runas']} -c \"{cmd}\""
-
+
return self.run_command(cmd)
def drop_to_shell(self):
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, nic, dhcp=True, ip=None, gateway=None, dns=None, *args, **kwargs):
+ from .systemd import Networkd
+
if dhcp:
conf = Networkd(Match={"Name": nic}, Network={"DHCP": "yes"})
else:
@@ -225,14 +235,14 @@ class Installer():
network["DNS"] = dns
conf = Networkd(Match={"Name": nic}, Network=network)
-
+
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
- def copy_ISO_network_config(self, enable_services=False):
+ def copy_iso_network_config(self, enable_services=False):
# Copy (if any) iwd password and config files
if os.path.isdir('/var/lib/iwd/'):
- if (psk_files := glob.glob('/var/lib/iwd/*.psk')):
+ if psk_files := glob.glob('/var/lib/iwd/*.psk'):
if not os.path.isdir(f"{self.target}/var/lib/iwd"):
os.makedirs(f"{self.target}/var/lib/iwd")
@@ -240,6 +250,7 @@ class Installer():
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
self.base_packages.append('iwd')
+
# This function will be called after minimal_installation()
# as a hook for post-installs. This hook is only needed if
# base is not installed yet.
@@ -257,7 +268,7 @@ class Installer():
shutil.copy2(psk, f"{self.target}/var/lib/iwd/{os.path.basename(psk)}")
# Copy (if any) systemd-networkd config files
- if (netconfigurations := glob.glob('/etc/systemd/network/*')):
+ if netconfigurations := glob.glob('/etc/systemd/network/*'):
if not os.path.isdir(f"{self.target}/etc/systemd/network/"):
os.makedirs(f"{self.target}/etc/systemd/network/")
@@ -267,38 +278,43 @@ class Installer():
if enable_services:
# If we haven't installed the base yet (function called pre-maturely)
if self.helper_flags.get('base', False) is False:
+
def post_install_enable_networkd_resolved(*args, **kwargs):
self.enable_service('systemd-networkd', 'systemd-resolved')
+
self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services
else:
self.enable_service('systemd-networkd', 'systemd-resolved')
-
return True
def detect_encryption(self, partition):
+ part = Partition(partition.parent, None, autodetect_filesystem=True)
if partition.encrypted:
return partition
- elif partition.parent not in partition.path and Partition(partition.parent, None, autodetect_filesystem=True).filesystem == 'crypto_LUKS':
- return Partition(partition.parent, None, autodetect_filesystem=True)
-
+ elif partition.parent not in partition.path and part.filesystem == 'crypto_LUKS':
+ return part
+
return False
- def minimal_installation(self):
- ## Add necessary packages if encrypting the drive
- ## (encrypted partitions default to btrfs for now, so we need btrfs-progs)
- ## TODO: Perhaps this should be living in the function which dictates
- ## the partitioning. Leaving here for now.
+ def mkinitcpio(self, *flags):
+ with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
+ mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n")
+ mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
+ mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
+ mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
+ SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
- MODULES = []
- BINARIES = []
- FILES = []
- HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"]
+ def minimal_installation(self):
+ # Add necessary packages if encrypting the drive
+ # (encrypted partitions default to btrfs for now, so we need btrfs-progs)
+ # TODO: Perhaps this should be living in the function which dictates
+ # the partitioning. Leaving here for now.
for partition in self.partitions:
if partition.filesystem == 'btrfs':
- #if partition.encrypted:
+ # if partition.encrypted:
self.base_packages.append('btrfs-progs')
if partition.filesystem == 'xfs':
self.base_packages.append('xfsprogs')
@@ -307,50 +323,48 @@ class Installer():
# Configure mkinitcpio to handle some specific use cases.
if partition.filesystem == 'btrfs':
- if 'btrfs' not in MODULES:
- MODULES.append('btrfs')
- if '/usr/bin/btrfs-progs' not in BINARIES:
- BINARIES.append('/usr/bin/btrfs')
+ if 'btrfs' not in self.MODULES:
+ self.MODULES.append('btrfs')
+ if '/usr/bin/btrfs-progs' not in self.BINARIES:
+ self.BINARIES.append('/usr/bin/btrfs')
if self.detect_encryption(partition):
- if 'encrypt' not in HOOKS:
- HOOKS.insert(HOOKS.index('filesystems'), 'encrypt')
+ if 'encrypt' not in self.HOOKS:
+ self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
- if not(hasUEFI()): # TODO: Allow for grub even on EFI
+ if not has_uefi():
self.base_packages.append('grub')
-
- self.pacstrap(self.base_packages)
- self.helper_flags['base-strapped'] = True
- #self.genfstab()
- if not isVM():
- vendor = cpuVendor()
- if vendor == "AuthenticAMD":
+
+ if not is_vm():
+ vendor = cpu_vendor()
+ if vendor == "AuthenticAMD":
self.base_packages.append("amd-ucode")
+ if (ucode := pathlib.Path(f"{self.target}/boot/amd-ucode.img")).exists():
+ ucode.unlink()
elif vendor == "GenuineIntel":
self.base_packages.append("intel-ucode")
+ if (ucode := pathlib.Path(f"{self.target}/boot/intel-ucode.img")).exists():
+ ucode.unlink()
else:
- self.log("Unknown cpu vendor not installing ucode")
+ self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG)
+
+ self.pacstrap(self.base_packages)
+ self.helper_flags['base-strapped'] = True
+
with open(f"{self.target}/etc/fstab", "a") as fstab:
- fstab.write(
- "\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n"
- ) # Redundant \n at the start? who knows?
-
- ## TODO: Support locale and timezone
- #os.remove(f'{self.target}/etc/localtime')
- #sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
- #sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
+ fstab.write("\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n") # Redundant \n at the start? who knows?
+
+ # TODO: Support locale and timezone
+ # os.remove(f'{self.target}/etc/localtime')
+ # sys_command(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{localtime} /etc/localtime')
+ # sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime')
self.set_hostname('archinstall')
self.set_locale('en_US')
# TODO: Use python functions for this
- sys_command(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
+ SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root')
- with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write(f"MODULES=({' '.join(MODULES)})\n")
- mkinit.write(f"BINARIES=({' '.join(BINARIES)})\n")
- mkinit.write(f"FILES=({' '.join(FILES)})\n")
- mkinit.write(f"HOOKS=({' '.join(HOOKS)})\n")
- sys_command(f'/usr/bin/arch-chroot {self.target} mkinitcpio -P')
+ self.mkinitcpio('-P')
self.helper_flags['base'] = True
@@ -365,7 +379,7 @@ class Installer():
boot_partition = None
root_partition = None
for partition in self.partitions:
- if partition.mountpoint == self.target+'/boot':
+ if partition.mountpoint == self.target + '/boot':
boot_partition = partition
elif partition.mountpoint == self.target:
root_partition = partition
@@ -375,14 +389,16 @@ class Installer():
if bootloader == 'systemd-bootctl':
self.pacstrap('efibootmgr')
- if not hasUEFI():
+ if not has_uefi():
raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config
# points towards the same disk and/or partition.
# And in which case we should do some clean up.
# Install the boot loader
- sys_command(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
+ if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0:
+ # Fallback, try creating the boot loader without touching the EFI variables
+ SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
# Modify or create a loader.conf
if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
@@ -391,65 +407,68 @@ class Installer():
else:
loader_data = [
f"default {self.init_time}",
- f"timeout 5"
+ "timeout 5"
]
-
+
with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
for line in loader_data:
if line[:8] == 'default ':
loader.write(f'default {self.init_time}\n')
+ elif line[:8] == '#timeout' and 'timeout 5' not in loader_data:
+ # We add in the default timeout to support dual-boot
+ loader.write(f"{line[1:]}\n")
else:
- loader.write(f"{line}")
+ loader.write(f"{line}\n")
- ## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
- ## And blkid is wrong in terms of LUKS.
- #UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
+ # For some reason, blkid and /dev/disk/by-uuid are not getting along well.
+ # And blkid is wrong in terms of LUKS.
+ # UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
# Setup the loader entry
with open(f'{self.target}/boot/loader/entries/{self.init_time}.conf', 'w') as entry:
- entry.write(f'# Created by: archinstall\n')
+ entry.write('# Created by: archinstall\n')
entry.write(f'# Created on: {self.init_time}\n')
- entry.write(f'title Arch Linux\n')
- entry.write(f'linux /vmlinuz-linux\n')
- if not isVM():
- vendor = cpuVendor()
- if vendor == "AuthenticAMD":
+ entry.write('title Arch Linux\n')
+ entry.write('linux /vmlinuz-linux\n')
+ if not is_vm():
+ vendor = cpu_vendor()
+ if vendor == "AuthenticAMD":
entry.write("initrd /amd-ucode.img\n")
elif vendor == "GenuineIntel":
entry.write("initrd /intel-ucode.img\n")
else:
self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
- entry.write(f'initrd /initramfs-linux.img\n')
- ## blkid doesn't trigger on loopback devices really well,
- ## so we'll use the old manual method until we get that sorted out.
-
+ entry.write('initrd /initramfs-linux.img\n')
+ # blkid doesn't trigger on loopback devices really well,
+ # so we'll use the old manual method until we get that sorted out.
- if (real_device := self.detect_encryption(root_partition)):
+ if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
- entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n')
+ entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n')
else:
log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp\n')
+ entry.write(f'options root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n')
self.helper_flags['bootloader'] = bootloader
return True
- raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
elif bootloader == "grub-install":
self.pacstrap('grub')
- if hasUEFI():
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
- sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
+ if has_uefi():
+ self.pacstrap('efibootmgr')
+ o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
+ SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
+ self.helper_flags['bootloader'] = True
return True
else:
- root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/","")}/..)"', shell=True).decode().strip()
+ root_device = subprocess.check_output(f'basename "$(readlink -f /sys/class/block/{root_partition.path.replace("/dev/", "")}/..)"', shell=True).decode().strip()
if root_device == "block":
root_device = f"{root_partition.path}"
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}'))
- sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
- self.helper_flags['bootloader'] = bootloader
+ o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc /dev/{root_device}'))
+ SysCommand('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
+ self.helper_flags['bootloader'] = True
return True
else:
raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}")
@@ -458,14 +477,7 @@ class Installer():
return self.pacstrap(*packages)
def install_profile(self, profile):
- # TODO: Replace this with a import archinstall.session instead in the profiles.
- # The tricky thing with doing the import archinstall.session instead is that
- # profiles might be run from a different chroot, and there's no way we can
- # guarantee file-path safety when accessing the installer object that way.
- # Doing the __builtins__ replacement, ensures that the global variable "installation"
- # is always kept up to date. It's considered a nasty hack - but it's a safe way
- # of ensuring 100% accuracy of archinstall session variables.
- __builtins__['installation'] = self
+ storage['installation_session'] = self
if type(profile) == str:
profile = Profile(self, profile)
@@ -473,21 +485,23 @@ class Installer():
self.log(f'Installing network profile {profile}', level=logging.INFO)
return profile.install()
- def enable_sudo(self, entity :str, group=False):
+ def enable_sudo(self, entity: str, group=False):
self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO)
with open(f'{self.target}/etc/sudoers', 'a') as sudoers:
sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n')
return True
- def user_create(self, user :str, password=None, groups=[], sudo=False):
+ def user_create(self, user: str, password=None, groups=None, sudo=False):
+ if groups is None:
+ groups = []
self.log(f'Creating user {user}', level=logging.INFO)
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
+ o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} useradd -m -G wheel {user}'))
if password:
self.user_set_pw(user, password)
if groups:
for group in groups:
- o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}'))
+ o = b''.join(SysCommand(f'/usr/bin/arch-chroot {self.target} gpasswd -a {user} {group}'))
if sudo and self.enable_sudo(user):
self.helper_flags['user'] = True
@@ -499,20 +513,55 @@ class Installer():
# This means the root account isn't locked/disabled with * in /etc/passwd
self.helper_flags['user'] = True
- o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
+ o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
pass
-
+
def user_set_shell(self, user, shell):
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
- o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
+ o = b''.join(SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\""))
pass
- def set_keyboard_language(self, language):
+ def set_keyboard_language(self, language: str) -> bool:
+ if len(language.strip()):
+ if not verify_keyboard_layout(language):
+ self.log(f"Invalid keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ return False
+
+ # In accordance with https://github.com/archlinux/archinstall/issues/107#issuecomment-841701968
+ # Setting an empty keymap first, allows the subsequent call to set layout for both console and x11.
+ from .systemd import Boot
+
+ with Boot(self) as session:
+ session.SysCommand(["localectl", "set-keymap", '""'])
+
+ if (output := session.SysCommand(["localectl", "set-keymap", language])).exit_code != 0:
+ raise ServiceException(f"Unable to set locale '{language}' for console: {output}")
+
+ self.log(f"Keyboard language for this installation is now set to: {language}")
+ else:
+ self.log('Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+
+ return True
+
+ def set_x11_keyboard_language(self, language: str) -> bool:
+ """
+ A fallback function to set x11 layout specifically and separately from console layout.
+ This isn't strictly necessary since .set_keyboard_language() does this as well.
+ """
if len(language.strip()):
- with open(f'{self.target}/etc/vconsole.conf', 'w') as vconsole:
- vconsole.write(f'KEYMAP={language}\n')
- vconsole.write(f'FONT=lat9w-16\n')
+ if not verify_x11_keyboard_layout(language):
+ self.log(f"Invalid x11-keyboard language specified: {language}", fg="red", level=logging.ERROR)
+ return False
+
+ from .systemd import Boot
+
+ with Boot(self) as session:
+ session.SysCommand(["localectl", "set-x11-keymap", '""'])
+
+ if (output := session.SysCommand(["localectl", "set-x11-keymap", language])).exit_code != 0:
+ raise ServiceException(f"Unable to set locale '{language}' for X11: {output}")
else:
- self.log(f'Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+ self.log(f'X11-Keyboard language was not changed from default (no language specified).', fg="yellow", level=logging.INFO)
+
return True
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index 3c373bc6..36228edc 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -1,20 +1,19 @@
-import subprocess
-import os
+import logging
+
+from .exceptions import ServiceException
+from .general import SysCommand
+from .output import log
-from .exceptions import *
-# from .general import sys_command
def list_keyboard_languages():
- locale_dir = '/usr/share/kbd/keymaps/'
+ for line in SysCommand("localectl --no-pager list-keymaps", environment_vars={'SYSTEMD_COLORS': '0'}):
+ yield line.decode('UTF-8').strip()
- if not os.path.isdir(locale_dir):
- raise RequirementError(f'Directory containing locales does not exist: {locale_dir}')
- for root, folders, files in os.walk(locale_dir):
+def list_x11_keyboard_languages():
+ for line in SysCommand("localectl --no-pager list-x11-keymap-layouts", environment_vars={'SYSTEMD_COLORS': '0'}):
+ yield line.decode('UTF-8').strip()
- for file in files:
- if os.path.splitext(file)[1] == '.gz':
- yield file.strip('.gz').strip('.map')
def verify_keyboard_layout(layout):
for language in list_keyboard_languages():
@@ -22,10 +21,29 @@ def verify_keyboard_layout(layout):
return True
return False
-def search_keyboard_layout(filter):
+
+def verify_x11_keyboard_layout(layout):
+ for language in list_x11_keyboard_languages():
+ if layout.lower() == language.lower():
+ return True
+ return False
+
+
+def search_keyboard_layout(layout):
for language in list_keyboard_languages():
- if filter.lower() in language.lower():
+ if layout.lower() in language.lower():
yield language
+
def set_keyboard_language(locale):
- return subprocess.call(['loadkeys', locale]) == 0
+ if len(locale.strip()):
+ if not verify_keyboard_layout(locale):
+ log(f"Invalid keyboard locale specified: {locale}", fg="red", level=logging.ERROR)
+ return False
+
+ if (output := SysCommand(f'localectl set-keymap {locale}')).exit_code != 0:
+ raise ServiceException(f"Unable to set locale '{locale}' for console: {output}")
+
+ return True
+
+ return False
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 7f8485e6..b910bfb2 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,15 +1,11 @@
-import os
-import shlex
-import time
import pathlib
-import logging
-from .exceptions import *
-from .general import *
+
from .disk import Partition
+from .general import *
from .output import log
-from .storage import storage
-class luks2():
+
+class luks2:
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
self.password = password
self.partition = partition
@@ -22,12 +18,12 @@ class luks2():
self.mapdev = None
def __enter__(self):
- #if self.partition.allow_formatting:
- # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
- #else:
+ # if self.partition.allow_formatting:
+ # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
+ # else:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
-
+
if type(self.password) != bytes:
self.password = bytes(self.password, 'UTF-8')
@@ -47,7 +43,7 @@ class luks2():
def encrypt(self, partition, password=None, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
if not self.partition.allow_formatting:
- raise DiskError(f'Could not encrypt volume {self.partition} due to it having a formatting lock.')
+ raise DiskError(f'Could not encrypt volume {partition} due to it having a formatting lock.')
log(f'Encrypting {partition} (This might take a while)', level=logging.INFO)
@@ -82,7 +78,7 @@ class luks2():
try:
# Try to setup the crypt-device
- cmd_handle = sys_command(cryptsetup_args)
+ cmd_handle = SysCommand(cryptsetup_args)
except SysCallError as err:
if err.exit_code == 256:
log(f'{partition} is being used, trying to unmount and crypt-close the device and running one more attempt at encrypting the device.', level=logging.DEBUG)
@@ -91,7 +87,7 @@ class luks2():
# Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda
- devinfo = json.loads(b''.join(sys_command(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
+ devinfo = json.loads(b''.join(SysCommand(f"lsblk --fs -J {partition.path}")).decode('UTF-8'))['blockdevices'][0]
# For each child (sub-partition/sub-device)
if len(children := devinfo.get('children', [])):
@@ -99,20 +95,20 @@ class luks2():
# Unmount the child location
if child_mountpoint := child.get('mountpoint', None):
log(f'Unmounting {child_mountpoint}', level=logging.DEBUG)
- sys_command(f"umount -R {child_mountpoint}")
+ SysCommand(f"umount -R {child_mountpoint}")
# And close it if possible.
log(f"Closing crypt device {child['name']}", level=logging.DEBUG)
- sys_command(f"cryptsetup close {child['name']}")
+ SysCommand(f"cryptsetup close {child['name']}")
# Then try again to set up the crypt-device
- cmd_handle = sys_command(cryptsetup_args)
+ cmd_handle = SysCommand(cryptsetup_args)
else:
raise err
if cmd_handle.exit_code != 0:
- raise DiskError(f'Could not encrypt volume "{partition.path}": {cmd_output}')
-
+ raise DiskError(f'Could not encrypt volume "{partition.path}": {b"".join(cmd_handle)}')
+
return key_file
def unlock(self, partition, mountpoint, key_file):
@@ -124,6 +120,7 @@ class luks2():
:type mountpoint: str
"""
from .disk import get_filesystem_type
+
if '/' in mountpoint:
os.path.basename(mountpoint) # TODO: Raise exception instead?
@@ -131,7 +128,7 @@ class luks2():
while pathlib.Path(partition.path).exists() is False and time.time() - wait_timer < 10:
time.sleep(0.025)
- sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
+ SysCommand(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
if os.path.islink(f'/dev/mapper/{mountpoint}'):
self.mapdev = f'/dev/mapper/{mountpoint}'
unlocked_partition = Partition(self.mapdev, None, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False)
@@ -142,9 +139,9 @@ class luks2():
if not mountpoint:
mountpoint = self.mapdev
- sys_command(f'/usr/bin/cryptsetup close {self.mapdev}')
+ SysCommand(f'/usr/bin/cryptsetup close {self.mapdev}')
return os.path.islink(self.mapdev) is False
def format(self, path):
- if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
+ if (handle := SysCommand(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index ae6c6422..ccfc2808 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,28 +1,30 @@
-import urllib.request, logging
+import urllib.error
+import urllib.request
-from .exceptions import *
from .general import *
from .output import log
-from .storage import storage
-def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tmp_dir='/root', *args, **kwargs):
+
+def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', *args, **kwargs):
"""
This function will change the active mirrors on the live medium by
filtering which regions are active based on `regions`.
- :param region: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
- :type region: str
+ :param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
+ :type regions: str
"""
region_list = []
for region in regions.split(','):
region_list.append(f'country={region}')
- o = b''.join(sys_command((f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist")))
- o = b''.join(sys_command((f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist")))
- o = b''.join(sys_command((f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}")))
-
+ response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'}))
+ new_list = response.read().replace(b"#Server", b"Server")
+ with open(destination, "wb") as mirrorlist:
+ mirrorlist.write(new_list)
+
return True
-def add_custom_mirrors(mirrors:list, *args, **kwargs):
+
+def add_custom_mirrors(mirrors: list, *args, **kwargs):
"""
This will append custom mirror definitions in pacman.conf
@@ -37,6 +39,7 @@ def add_custom_mirrors(mirrors:list, *args, **kwargs):
return True
+
def insert_mirrors(mirrors, *args, **kwargs):
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
@@ -58,7 +61,8 @@ def insert_mirrors(mirrors, *args, **kwargs):
return True
-def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
+
+def use_mirrors(regions: dict, destination='/etc/pacman.d/mirrorlist'):
log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
for region, mirrors in regions.items():
with open(destination, 'w') as mirrorlist:
@@ -67,13 +71,15 @@ def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
mirrorlist.write(f'Server = {mirror}\n')
return True
+
def re_rank_mirrors(top=10, *positionals, **kwargs):
- if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0:
+ if SysCommand(f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist').exit_code == 0:
return True
return False
+
def list_mirrors():
- url = f"https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
+ url = "https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}
try:
@@ -82,7 +88,6 @@ def list_mirrors():
log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="yellow")
return regions
-
region = 'Unknown region'
for line in response.readlines():
if len(line.strip()) == 0:
@@ -97,4 +102,4 @@ def list_mirrors():
url = line.lstrip('#Server = ')
regions[region][url] = True
- return regions \ No newline at end of file
+ return regions
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 2dc8be9b..0643c9cf 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -1,28 +1,43 @@
-import os
import fcntl
+import logging
+import os
import socket
import struct
from collections import OrderedDict
+
from .exceptions import *
-from .general import sys_command
+from .general import SysCommand
+from .output import log
from .storage import storage
-def getHwAddr(ifname):
+
+def get_hw_addr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
+ info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
-
+
+
def list_interfaces(skip_loopback=True):
interfaces = OrderedDict()
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
- mac = getHwAddr(iface).replace(':', '-').lower()
+ mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
return interfaces
-def enrichIfaceTypes(interfaces :dict):
+
+def check_mirror_reachable():
+ if (exit_code := SysCommand("pacman -Sy").exit_code) == 0:
+ return True
+ elif exit_code == 256:
+ log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
+
+ return False
+
+
+def enrich_iface_types(interfaces: dict):
result = {}
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
@@ -39,30 +54,34 @@ def enrichIfaceTypes(interfaces :dict):
result[iface] = 'UNKNOWN'
return result
+
def get_interface_from_mac(mac):
return list_interfaces().get(mac.lower(), None)
-def wirelessScan(interface):
- interfaces = enrichIfaceTypes(list_interfaces().values())
+
+def wireless_scan(interface):
+ interfaces = enrich_iface_types(list_interfaces().values())
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
- sys_command(f"iwctl station {interface} scan")
+ SysCommand(f"iwctl station {interface} scan")
- if not '_WIFI' in storage:
+ if '_WIFI' not in storage:
storage['_WIFI'] = {}
- if not interface in storage['_WIFI']:
+ if interface not in storage['_WIFI']:
storage['_WIFI'][interface] = {}
storage['_WIFI'][interface]['scanning'] = True
+
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
-def getWirelessNetworks(interface):
+def get_wireless_networks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
- if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
+ if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
- wirelessScan(interface)
+
+ wireless_scan(interface)
time.sleep(5)
- for line in sys_command(f"iwctl station {interface} get-networks"):
+ for line in SysCommand(f"iwctl station {interface} get-networks"):
print(line)
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 06d99778..595e9693 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -1,45 +1,48 @@
import abc
+import logging
import os
import sys
-import logging
from pathlib import Path
+
from .storage import storage
+
# TODO: use logging's built in levels instead.
# Although logging is threaded and I wish to avoid that.
# It's more Pythonistic or w/e you want to call it.
-class LOG_LEVELS:
+class LogLevels:
Critical = 0b001
Error = 0b010
Warning = 0b011
Info = 0b101
Debug = 0b111
-class journald(dict):
+
+class Journald(dict):
@abc.abstractmethod
def log(message, level=logging.DEBUG):
try:
- import systemd.journal
+ import systemd.journal # type: ignore
except ModuleNotFoundError:
return False
- # For backwards compability, convert old style log-levels
+ # For backwards compatibility, convert old style log-levels
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
- if level == LOG_LEVELS.Critical:
+ if level == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.CRITICAL
- elif level == LOG_LEVELS.Error:
+ elif level == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.ERROR
- elif level == LOG_LEVELS.Warning:
+ elif level == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.WARNING
- elif level == LOG_LEVELS.Info:
+ elif level == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.INFO
- elif level == LOG_LEVELS.Debug:
+ elif level == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.DEBUG
@@ -49,14 +52,16 @@ class journald(dict):
log_ch.setFormatter(log_fmt)
log_adapter.addHandler(log_ch)
log_adapter.setLevel(logging.DEBUG)
-
+
log_adapter.log(level, message)
+
# TODO: Replace log() for session based logging.
-class SessionLogging():
+class SessionLogging:
def __init__(self):
pass
+
# Found first reference here: https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python
# And re-used this: https://github.com/django/django/blob/master/django/core/management/color.py#L12
def supports_color():
@@ -70,18 +75,19 @@ def supports_color():
is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
return supported_platform and is_a_tty
+
# Heavily influenced by: https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13
# Color options here: https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i
-def stylize_output(text :str, *opts, **kwargs):
- opt_dict = {'bold': '1', 'italic' : '3', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
+def stylize_output(text: str, *opts, **kwargs):
+ opt_dict = {'bold': '1', 'italic': '3', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
foreground = {color_names[x]: '3%s' % x for x in range(8)}
background = {color_names[x]: '4%s' % x for x in range(8)}
- RESET = '0'
+ reset = '0'
code_list = []
if text == '' and len(opts) == 1 and opts[0] == 'reset':
- return '\x1b[%sm' % RESET
+ return '\x1b[%sm' % reset
for k, v in kwargs.items():
if k == 'fg':
code_list.append(foreground[v])
@@ -91,9 +97,10 @@ def stylize_output(text :str, *opts, **kwargs):
if o in opt_dict:
code_list.append(opt_dict[o])
if 'noreset' not in opts:
- text = '%s\x1b[%sm' % (text or '', RESET)
+ text = '%s\x1b[%sm' % (text or '', reset)
return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '')
+
def log(*args, **kwargs):
string = orig_string = ' '.join([str(x) for x in args])
@@ -105,7 +112,7 @@ def log(*args, **kwargs):
# If a logfile is defined in storage,
# we use that one to output everything
- if (filename := storage.get('LOG_FILE', None)):
+ if filename := storage.get('LOG_FILE', None):
absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename)
try:
@@ -114,8 +121,8 @@ def log(*args, **kwargs):
log_file.write("")
except PermissionError:
# Fallback to creating the log file in the current folder
- err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute()/filename} instead."
- absolute_logfile = Path('./').absolute()/filename
+ err_string = f"Not enough permission to place log file at {absolute_logfile}, creating it in {Path('./').absolute() / filename} instead."
+ absolute_logfile = Path('./').absolute() / filename
absolute_logfile.parents[0].mkdir(exist_ok=True)
absolute_logfile = str(absolute_logfile)
storage['LOG_PATH'] = './'
@@ -128,35 +135,35 @@ def log(*args, **kwargs):
# Unless the level is higher than we've decided to output interactively.
# (Remember, log files still get *ALL* the output despite level restrictions)
if 'level' in kwargs:
- # For backwards compability, convert old style log-levels
+ # For backwards compatibility, convert old style log-levels
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
- if kwargs['level'] == LOG_LEVELS.Critical:
+ if kwargs['level'] == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.CRITICAL
- elif kwargs['level'] == LOG_LEVELS.Error:
+ elif kwargs['level'] == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.ERROR
- elif kwargs['level'] == LOG_LEVELS.Warning:
+ elif kwargs['level'] == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.WARNING
- elif kwargs['level'] == LOG_LEVELS.Info:
+ elif kwargs['level'] == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.INFO
- elif kwargs['level'] == LOG_LEVELS.Debug:
+ elif kwargs['level'] == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG
- if kwargs['level'] > storage.get('LOG_LEVEL', logging.INFO) and not 'force' in kwargs:
+ if kwargs['level'] < storage.get('LOG_LEVEL', logging.INFO) and 'force' not in kwargs:
# Level on log message was Debug, but output level is set to Info.
# In that case, we'll drop it.
return None
try:
- journald.log(string, level=kwargs.get('level', logging.INFO))
+ Journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError:
- pass # Ignore writing to journald
+ pass # Ignore writing to journald
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and
diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py
index 4f6b6c61..0ea195d2 100644
--- a/archinstall/lib/packages.py
+++ b/archinstall/lib/packages.py
@@ -1,10 +1,15 @@
-import urllib.request, urllib.parse
-import ssl, json
+import json
+import ssl
+import urllib.error
+import urllib.parse
+import urllib.request
+
from .exceptions import *
BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
+
def find_group(name):
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
@@ -16,11 +21,12 @@ def find_group(name):
return False
else:
raise err
-
+
# Just to be sure some code didn't slip through the exception
if response.code == 200:
return True
+
def find_package(name):
"""
Finds a specific package via the package database.
@@ -33,6 +39,7 @@ def find_package(name):
data = response.read().decode('UTF-8')
return json.loads(data)
+
def find_packages(*names):
"""
This function returns the search results for many packages.
@@ -44,7 +51,8 @@ def find_packages(*names):
result[package] = find_package(package)
return result
-def validate_package_list(packages :list):
+
+def validate_package_list(packages: list):
"""
Validates a list of given packages.
Raises `RequirementError` if one or more packages are not found.
@@ -53,8 +61,8 @@ def validate_package_list(packages :list):
for package in packages:
if not find_package(package)['results'] and not find_group(package):
invalid_packages.append(package)
-
+
if invalid_packages:
raise RequirementError(f"Invalid package names: {invalid_packages}")
- return True \ No newline at end of file
+ return True
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 06237c1c..d4913e7e 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,20 +1,28 @@
-import os, urllib.request, urllib.parse, ssl, json, re
-import importlib.util, sys, glob, hashlib, logging
-from collections import OrderedDict
-from .general import multisplit, sys_command
-from .exceptions import *
+import hashlib
+import importlib.util
+import json
+import re
+import ssl
+import sys
+import urllib.error
+import urllib.parse
+import urllib.request
+from typing import Optional
+
+from .general import multisplit
from .networking import *
-from .output import log
from .storage import storage
+
def grab_url_data(path):
- safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
+ safe_path = path[: path.find(':') + 1] + ''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':') + 1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
- ssl_context.verify_mode=ssl.CERT_NONE
+ ssl_context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
+
def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
# TODO: Grab from github page as well, not just local static files
if filter_irrelevant_macs:
@@ -23,8 +31,10 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
cache = {}
# Grab all local profiles found in PROFILE_PATH
for PATH_ITEM in storage['PROFILE_PATH']:
- for root, folders, files in os.walk(os.path.abspath(os.path.expanduser(PATH_ITEM+subpath))):
+ for root, folders, files in os.walk(os.path.abspath(os.path.expanduser(PATH_ITEM + subpath))):
for file in files:
+ if file == '__init__.py':
+ continue
if os.path.splitext(file)[1] == '.py':
tailored = False
if len(mac := re.findall('(([a-zA-z0-9]{2}[-:]){5}([a-zA-z0-9]{2}))', file)):
@@ -35,24 +45,24 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
description = ''
with open(os.path.join(root, file), 'r') as fh:
first_line = fh.readline()
- if first_line[0] == '#':
+ if len(first_line) and first_line[0] == '#':
description = first_line[1:].strip()
- cache[file[:-3]] = {'path' : os.path.join(root, file), 'description' : description, 'tailored' : tailored}
+ cache[file[:-3]] = {'path': os.path.join(root, file), 'description': description, 'tailored': tailored}
break
# Grab profiles from upstream URL
if storage['PROFILE_DB']:
- profiles_url = os.path.join(storage["UPSTREAM_URL"]+subpath, storage['PROFILE_DB'])
+ profiles_url = os.path.join(storage["UPSTREAM_URL"] + subpath, storage['PROFILE_DB'])
try:
profile_list = json.loads(grab_url_data(profiles_url))
except urllib.error.HTTPError as err:
print(f'Error: Listing profiles on URL "{profiles_url}" resulted in:', err)
return cache
- except:
+ except json.decoder.JSONDecodeError as err:
print(f'Error: Could not decode "{profiles_url}" result as JSON:', err)
return cache
-
+
for profile in profile_list:
if os.path.splitext(profile)[1] == '.py':
tailored = False
@@ -61,16 +71,17 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
continue
tailored = True
- cache[profile[:-3]] = {'path' : os.path.join(storage["UPSTREAM_URL"]+subpath, profile), 'description' : profile_list[profile], 'tailored' : tailored}
+ cache[profile[:-3]] = {'path': os.path.join(storage["UPSTREAM_URL"] + subpath, profile), 'description': profile_list[profile], 'tailored': tailored}
if filter_top_level_profiles:
for profile in list(cache.keys()):
if Profile(None, profile).is_top_level_profile() is False:
- del(cache[profile])
+ del cache[profile]
return cache
-class Script():
+
+class Script:
def __init__(self, profile, installer=None):
# profile: https://hvornum.se/something.py
# profile: desktop
@@ -144,19 +155,22 @@ class Script():
return self
def execute(self):
- if not self.namespace in sys.modules or self.spec is None:
+ if self.namespace not in sys.modules or self.spec is None:
self.load_instructions()
self.spec.loader.exec_module(sys.modules[self.namespace])
return sys.modules[self.namespace]
+
class Profile(Script):
- def __init__(self, installer, path, args={}):
+ def __init__(self, installer, path, args=None):
super(Profile, self).__init__(path, installer)
+ if args is None:
+ args = {}
def __dump__(self, *args, **kwargs):
- return {'path' : self.path}
+ return {'path': self.path}
def __repr__(self, *args, **kwargs):
return f'Profile({os.path.basename(self.profile)})'
@@ -215,7 +229,7 @@ class Profile(Script):
return True
@property
- def packages(self) -> list:
+ def packages(self) -> Optional[list]:
"""
Returns a list of packages baked into the profile definition.
If no package definition has been done, .packages() will return None.
@@ -235,6 +249,7 @@ class Profile(Script):
return imported.__packages__
return None
+
class Application(Profile):
def __repr__(self, *args, **kwargs):
return f'Application({os.path.basename(self.profile)})'
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
index bb6f64f2..6f8f2a87 100644
--- a/archinstall/lib/services.py
+++ b/archinstall/lib/services.py
@@ -1,12 +1,10 @@
-import os
-
-from .exceptions import *
from .general import *
+
def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
- state = b''.join(sys_command(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS' : '0'}))
+ state = b''.join(SysCommand(f'systemctl show --no-pager -p SubState --value {service_name}', environment_vars={'SYSTEMD_COLORS': '0'}))
return state.strip().decode('UTF-8')
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 43d088bb..4e19e4d4 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -8,15 +8,15 @@ import os
#
# And Keeping this in dict ensures that variables are shared across imports.
storage = {
- 'PROFILE_PATH' : [
+ 'PROFILE_PATH': [
'./profiles',
'~/.config/archinstall/profiles',
- os.path.join(os.path.dirname(os.path.abspath(__file__)), 'profiles'),
- #os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
+ os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'profiles'),
+ # os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
],
- 'UPSTREAM_URL' : 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
- 'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
- 'LOG_PATH' : '/var/log/archinstall',
- 'LOG_FILE' : 'install.log',
- 'MOUNT_POINT' : '/mnt'
+ 'UPSTREAM_URL': 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
+ 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
+ 'LOG_PATH': '/var/log/archinstall',
+ 'LOG_FILE': 'install.log',
+ 'MOUNT_POINT': '/mnt',
}
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index f2b7c9b3..383f1f17 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -1,4 +1,12 @@
-class Ini():
+import logging
+
+from .general import SysCommand, SysCommandWorker, locate_binary
+from .installer import Installer
+from .output import log
+from .storage import storage
+
+
+class Ini:
def __init__(self, *args, **kwargs):
"""
Limited INI handler for now.
@@ -25,12 +33,89 @@ class Ini():
return result
+
class Systemd(Ini):
"""
Placeholder class to do systemd specific setups.
"""
+
class Networkd(Systemd):
"""
Placeholder class to do systemd-network specific setups.
"""
+
+
+class Boot:
+ def __init__(self, installation: Installer):
+ self.instance = installation
+ self.container_name = 'archinstall'
+ self.session = None
+ self.ready = False
+
+ def __enter__(self):
+ if (existing_session := storage.get('active_boot', None)) and existing_session.instance != self.instance:
+ raise KeyError("Archinstall only supports booting up one instance, and a active session is already active and it is not this one.")
+
+ if existing_session:
+ self.session = existing_session.session
+ self.ready = existing_session.ready
+ else:
+ self.session = SysCommandWorker([
+ '/usr/bin/systemd-nspawn',
+ '-D', self.instance.target,
+ '-b',
+ '--machine', self.container_name
+ ])
+
+ if not self.ready:
+ while self.session.is_alive():
+ if b' login:' in self.session:
+ self.ready = True
+ break
+
+ storage['active_boot'] = self
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ # b''.join(sys_command('sync')) # No need to, since the underlying fs() object will call sync.
+ # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
+
+ if len(args) >= 2 and args[1]:
+ log(args[1], level=logging.ERROR, fg='red')
+ log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red")
+
+ SysCommand(f'machinectl shell {self.container_name} /bin/bash -c "shutdown now"')
+
+ def __iter__(self):
+ if self.session:
+ for value in self.session:
+ yield value
+
+ def __contains__(self, key: bytes):
+ if self.session is None:
+ return False
+
+ return key in self.session
+
+ def is_alive(self):
+ if self.session is None:
+ return False
+
+ return self.session.is_alive()
+
+ def SysCommand(self, cmd: list, *args, **kwargs):
+ if cmd[0][0] != '/' and cmd[0][:2] != './':
+ # This check is also done in SysCommand & SysCommandWorker.
+ # However, that check is done for `machinectl` and not for our chroot command.
+ # So this wrapper for SysCommand will do this additionally.
+
+ cmd[0] = locate_binary(cmd[0])
+
+ return SysCommand(["machinectl", "shell", self.container_name, *cmd], *args, **kwargs)
+
+ def SysCommandWorker(self, cmd: list, *args, **kwargs):
+ if cmd[0][0] != '/' and cmd[0][:2] != './':
+ cmd[0] = locate_binary(cmd[0])
+
+ return SysCommandWorker(["machinectl", "shell", self.container_name, *cmd], *args, **kwargs)
diff --git a/archinstall/lib/tts.py b/archinstall/lib/tts.py
deleted file mode 100644
index e69de29b..00000000
--- a/archinstall/lib/tts.py
+++ /dev/null
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 451251cd..0a4cd0f9 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,27 +1,41 @@
-import getpass, pathlib, os, shutil, re, time
-import sys, time, signal, ipaddress, logging
-import termios, tty, select # Used for char by char polling of sys.stdin
+import getpass
+import ipaddress
+import logging
+import pathlib
+import re
+import select # Used for char by char polling of sys.stdin
+import shutil
+import signal
+import sys
+import termios
+import time
+import tty
+
from .exceptions import *
-from .profiles import Profile
+from .general import SysCommand
+from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi
from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
-from .output import log
-from .storage import storage
from .networking import list_interfaces
-from .general import sys_command
-from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI
+from .output import log
+from .profiles import Profile, list_profiles
+
+
+# TODO: Some inconsistencies between the selection processes.
+# Some return the keys from the options, some the values?
-## TODO: Some inconsistencies between the selection processes.
-## Some return the keys from the options, some the values?
def get_terminal_height():
return shutil.get_terminal_size().lines
+
def get_terminal_width():
return shutil.get_terminal_size().columns
+
def get_longest_option(options):
return max([len(x) for x in options])
+
def check_for_correct_username(username):
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
@@ -32,8 +46,10 @@ def check_for_correct_username(username):
)
return False
+
def do_countdown():
SIG_TRIGGER = False
+
def kill_handler(sig, frame):
print()
exit(0)
@@ -67,8 +83,9 @@ def do_countdown():
signal.signal(signal.SIGINT, original_sigint_handler)
return True
+
def get_password(prompt="Enter a password: "):
- while (passwd := getpass.getpass(prompt)):
+ while passwd := getpass.getpass(prompt):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
if passwd != passwd_verification:
log(' * Passwords did not match * ', fg='red')
@@ -80,26 +97,41 @@ def get_password(prompt="Enter a password: "):
return passwd
return None
+
def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
+ spaces_without_option = longest_line - (len(separator) + highest_index_number_length)
max_num_of_columns = get_terminal_width() // longest_line
- max_options_in_cells = max_num_of_columns * (get_terminal_height()-margin_bottom)
+ max_options_in_cells = max_num_of_columns * (get_terminal_height() - margin_bottom)
- if (len(options) > max_options_in_cells):
+ if len(options) > max_options_in_cells:
for index, option in enumerate(options):
print(f"{index}: {option}")
+ return 1, index
else:
- for row in range(0, (get_terminal_height()-margin_bottom)):
- for column in range(row, len(options), (get_terminal_height()-margin_bottom)):
- spaces = " "*(longest_line - len(options[column]))
- print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end = spaces)
+ for row in range(0, (get_terminal_height() - margin_bottom)):
+ for column in range(row, len(options), (get_terminal_height() - margin_bottom)):
+ spaces = " " * (spaces_without_option - len(options[column]))
+ print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end=spaces)
print()
return column, row
def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False):
+ # Checking if the options are different from `list` or `dict` or if they are empty
+ if type(options) not in [list, dict]:
+ log(f" * Generic multi-select doesn't support ({type(options)}) as type of options * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError("generic_multi_select() requires list or dictionary as options.")
+ if not options:
+ log(" * Generic multi-select didn't find any options to choose from * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError('generic_multi_select() requires at least one option to proceed.')
+ # After passing the checks, function continues to work
+ if type(options) == dict:
+ options = list(options.values())
if sort:
options = sorted(options)
@@ -108,7 +140,7 @@ def generic_multi_select(options, text="Select one or more of the options above
selected_options = []
while True:
- if len(selected_options) <= 0 and default and default in options:
+ if not selected_options and default in options:
selected_options.append(default)
printed_options = []
@@ -118,38 +150,48 @@ def generic_multi_select(options, text="Select one or more of the options above
else:
printed_options.append(f'{option}')
- section.clear(0, get_terminal_height()-section._cursor_y-1)
- x, y = print_large_list(printed_options, margin_bottom=2)
+ section.clear(0, get_terminal_height() - section._cursor_y - 1)
+ print_large_list(printed_options, margin_bottom=2)
section._cursor_y = len(printed_options)
section._cursor_x = 0
section.write_line(text)
section.input_pos = section._cursor_x
selected_option = section.get_keyboard_input(end=None)
-
- if selected_option is None:
- if len(selected_options) <= 0 and default:
- selected_options = [default]
-
- if len(selected_options) or allow_empty is True:
- break
- else:
- log('* Need to select at least one option!', fg='red')
- continue
-
- elif selected_option.isdigit():
- if (selected_option := int(selected_option)) >= len(options):
- log('* Option is out of range, please select another one!', fg='red')
- continue
- selected_option = options[selected_option]
- if selected_option in selected_options:
- selected_options.remove(selected_option)
+ # This string check is necessary to correct work with it
+ # Without this, Python will raise AttributeError because of stripping `None`
+ # It also allows to remove empty spaces if the user accidentally entered them.
+ if isinstance(selected_option, str):
+ selected_option = selected_option.strip()
+ try:
+ if not selected_option:
+ if not selected_options and default:
+ selected_options = [default]
+ elif selected_options or allow_empty:
+ break
+ else:
+ raise RequirementError('Please select at least one option to continue')
+ elif selected_option.isnumeric():
+ if (selected_option := int(selected_option)) >= len(options):
+ raise RequirementError(f'Selected option "{selected_option}" is out of range')
+ selected_option = options[selected_option]
+ if selected_option in selected_options:
+ selected_options.remove(selected_option)
+ else:
+ selected_options.append(selected_option)
+ elif selected_option in options:
+ if selected_option in selected_options:
+ selected_options.remove(selected_option)
+ else:
+ selected_options.append(selected_option)
else:
- selected_options.append(selected_option)
+ raise RequirementError(f'Selected option "{selected_option}" does not exist in available options')
+ except RequirementError as e:
+ log(f" * {e} * ", fg='red')
return selected_options
-class MiniCurses():
+class MiniCurses:
def __init__(self, width, height):
self.width = width
self.height = height
@@ -164,7 +206,7 @@ class MiniCurses():
sys.stdout.flush()
sys.stdout.write("\033[%dG" % 0)
sys.stdout.flush()
- sys.stdout.write(" " * (get_terminal_width()-1))
+ sys.stdout.write(" " * (get_terminal_width() - 1))
sys.stdout.flush()
sys.stdout.write("\033[%dG" % 0)
sys.stdout.flush()
@@ -173,36 +215,38 @@ class MiniCurses():
self._cursor_x += len(text)
def clear(self, x, y):
- if x < 0: x = 0
- if y < 0: y = 0
+ if x < 0:
+ x = 0
+ if y < 0:
+ y = 0
- #import time
- #sys.stdout.write(f"Clearing from: {x, y}")
- #sys.stdout.flush()
- #time.sleep(2)
+ # import time
+ # sys.stdout.write(f"Clearing from: {x, y}")
+ # sys.stdout.flush()
+ # time.sleep(2)
sys.stdout.flush()
sys.stdout.write('\033[%d;%df' % (y, x))
- for line in range(get_terminal_height()-y-1, y):
- sys.stdout.write(" " * (get_terminal_width()-1))
+ for line in range(get_terminal_height() - y - 1, y):
+ sys.stdout.write(" " * (get_terminal_width() - 1))
sys.stdout.flush()
sys.stdout.write('\033[%d;%df' % (y, x))
sys.stdout.flush()
def deal_with_control_characters(self, char):
mapper = {
- '\x7f' : 'BACKSPACE',
- '\r' : 'CR',
- '\n' : 'NL'
+ '\x7f': 'BACKSPACE',
+ '\r': 'CR',
+ '\n': 'NL'
}
if (mapped_char := mapper.get(char, None)) == 'BACKSPACE':
if self._cursor_x <= self.input_pos:
- # Don't backspace futher back than the cursor start position during input
+ # Don't backspace further back than the cursor start position during input
return True
# Move back to the current known position (BACKSPACE doesn't updated x-pos)
sys.stdout.flush()
- sys.stdout.write("\033[%dG" % (self._cursor_x))
+ sys.stdout.write("\033[%dG" % self._cursor_x)
sys.stdout.flush()
# Write a blank space
@@ -212,7 +256,7 @@ class MiniCurses():
# And move back again
sys.stdout.flush()
- sys.stdout.write("\033[%dG" % (self._cursor_x))
+ sys.stdout.write("\033[%dG" % self._cursor_x)
sys.stdout.flush()
self._cursor_x -= 1
@@ -235,16 +279,16 @@ class MiniCurses():
poller.register(sys.stdin.fileno(), select.EPOLLIN)
- EOF = False
- while EOF is False:
+ eof = False
+ while eof is False:
for fileno, event in poller.poll(0.025):
char = sys.stdin.read(1)
- #sys.stdout.write(f"{[char]}")
- #sys.stdout.flush()
+ # sys.stdout.write(f"{[char]}")
+ # sys.stdout.flush()
- if (newline := (char in ('\n', '\r'))):
- EOF = True
+ if newline := (char in ('\n', '\r')):
+ eof = True
if not newline or strip_rowbreaks is False:
response += char
@@ -263,14 +307,15 @@ class MiniCurses():
if response:
return response
-def ask_for_superuser_account(prompt='Username for required super-user with sudo privileges: ', forced=False):
+
+def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
if not new_user and forced:
# TODO: make this text more generic?
# It's only used to create the first sudo user when root is disabled in guided.py
- log(' * Since root is disabled, you need to create a least one (super) user!', fg='red')
+ log(' * Since root is disabled, you need to create a least one superuser!', fg='red')
continue
elif not new_user and not forced:
raise UserError("No superuser was created.")
@@ -278,11 +323,12 @@ def ask_for_superuser_account(prompt='Username for required super-user with sudo
continue
password = get_password(prompt=f'Password for user {new_user}: ')
- return {new_user: {"!password" : password}}
+ return {new_user: {"!password": password}}
+
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
users = {}
- super_users = {}
+ superusers = {}
while 1:
new_user = input(prompt).strip(' ')
@@ -291,20 +337,21 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
if not check_for_correct_username(new_user):
continue
password = get_password(prompt=f'Password for user {new_user}: ')
-
- if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
- super_users[new_user] = {"!password" : password}
+
+ if input("Should this user be a superuser (sudoer) [y/N]: ").strip(' ').lower() in ('y', 'yes'):
+ superusers[new_user] = {"!password": password}
else:
- users[new_user] = {"!password" : password}
+ users[new_user] = {"!password": password}
+
+ return users, superusers
- return users, super_users
def ask_for_a_timezone():
while True:
timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip().strip('*.')
if timezone == '':
timezone = 'UTC'
- if (pathlib.Path("/usr")/"share"/"zoneinfo"/timezone).exists():
+ if (pathlib.Path("/usr") / "share" / "zoneinfo" / timezone).exists():
return timezone
else:
log(
@@ -313,38 +360,41 @@ def ask_for_a_timezone():
fg='red'
)
+
def ask_for_bootloader() -> str:
bootloader = "systemd-bootctl"
- if hasUEFI()==False:
- bootloader="grub-install"
+ if not has_uefi():
+ bootloader = "grub-install"
else:
bootloader_choice = input("Would you like to use GRUB as a bootloader instead of systemd-boot? [y/N] ").lower()
if bootloader_choice == "y":
- bootloader="grub-install"
+ bootloader = "grub-install"
return bootloader
+
def ask_for_audio_selection():
- audio = "pulseaudio" # Default for most desktop environments
+ audio = "pulseaudio" # Default for most desktop environments
pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
if pipewire_choice in ("y", ""):
audio = "pipewire"
return audio
+
def ask_to_configure_network():
# Optionally configure one network interface.
- #while 1:
+ # while 1:
# {MAC: Ifname}
interfaces = {
- 'ISO-CONFIG' : 'Copy ISO network configuration to installation',
- 'NetworkManager':'Use NetworkManager to control and manage your internet connection',
+ 'ISO-CONFIG': 'Copy ISO network configuration to installation',
+ 'NetworkManager': 'Use NetworkManager to control and manage your internet connection',
**list_interfaces()
}
nic = generic_select(interfaces, "Select one network interface to configure (leave blank to skip): ")
if nic and nic != 'Copy ISO network configuration to installation':
if nic == 'Use NetworkManager to control and manage your internet connection':
- return {'nic': nic,'NetworkManager':True}
+ return {'nic': nic, 'NetworkManager': True}
# Current workaround:
# For selecting modes without entering text within brackets,
@@ -354,8 +404,7 @@ def ask_to_configure_network():
for index, mode in enumerate(modes):
print(f"{index}: {mode}")
- mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ",
- options_output=False)
+ mode = generic_select(['DHCP', 'IP'], f"Select which mode to configure for {nic} or leave blank for DHCP: ", options_output=False)
if mode == 'IP':
while 1:
ip = input(f"Enter the IP and subnet for {nic} (example: 192.168.0.5/24): ").strip()
@@ -390,7 +439,7 @@ def ask_to_configure_network():
if len(dns_input := input('Enter your DNS servers (space separated, blank for none): ').strip()):
dns = dns_input.split(' ')
- return {'nic': nic, 'dhcp': False, 'ip': ip, 'gateway' : gateway, 'dns' : dns}
+ return {'nic': nic, 'dhcp': False, 'ip': ip, 'gateway': gateway, 'dns': dns}
else:
return {'nic': nic}
elif nic:
@@ -398,29 +447,30 @@ def ask_to_configure_network():
return {}
+
def ask_for_disk_layout():
options = {
- 'keep-existing' : 'Keep existing partition layout and select which ones to use where',
- 'format-all' : 'Format entire drive and setup a basic partition scheme',
- 'abort' : 'Abort the installation'
+ 'keep-existing': 'Keep existing partition layout and select which ones to use where',
+ 'format-all': 'Format entire drive and setup a basic partition scheme',
+ 'abort': 'Abort the installation',
}
- value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ",
- allow_empty_input=False, sort=True)
+ value = generic_select(options, "Found partitions on the selected drive, (select by number) what you want to do: ", allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None)
+
def ask_for_main_filesystem_format():
options = {
- 'btrfs' : 'btrfs',
- 'ext4' : 'ext4',
- 'xfs' : 'xfs',
- 'f2fs' : 'f2fs'
+ 'btrfs': 'btrfs',
+ 'ext4': 'ext4',
+ 'xfs': 'xfs',
+ 'f2fs': 'f2fs'
}
- value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ",
- allow_empty_input=False)
+ value = generic_select(options, "Select which filesystem your main partition should use (by number or name): ", allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None)
+
def generic_select(options, input_text="Select one of the above by index or absolute value: ", allow_empty_input=True, options_output=True, sort=False):
"""
A generic select function that does not output anything
@@ -435,20 +485,23 @@ def generic_select(options, input_text="Select one of the above by index or abso
this function returns an item from list, a string, or None
"""
- # Checking if options are different from `list` or `dict`
+ # Checking if the options are different from `list` or `dict` or if they are empty
if type(options) not in [list, dict]:
log(f" * Generic select doesn't support ({type(options)}) as type of options * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError("generic_select() requires list or dictionary as options.")
- # To allow only `list` and `dict`, converting values of options here.
- # Therefore, now we can only provide the dictionary itself
- if type(options) == dict: options = list(options.values())
- if sort: options = sorted(options) # As we pass only list and dict (converted to list), we can skip converting to list
- if len(options) == 0:
- log(f" * Generic select didn't find any options to choose from * ", fg='red')
+ if not options:
+ log(" * Generic select didn't find any options to choose from * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError('generic_select() requires at least one option to proceed.')
-
+ # After passing the checks, function continues to work
+ if type(options) == dict:
+ # To allow only `list` and `dict`, converting values of options here.
+ # Therefore, now we can only provide the dictionary itself
+ options = list(options.values())
+ if sort:
+ # As we pass only list and dict (converted to list), we can skip converting to list
+ options = sorted(options)
# Added ability to disable the output of options items,
# if another function displays something different from this
@@ -460,8 +513,8 @@ def generic_select(options, input_text="Select one of the above by index or abso
# Now the try...except block handles validation for invalid input from the user
while True:
try:
- selected_option = input(input_text)
- if len(selected_option.strip()) == 0:
+ selected_option = input(input_text).strip()
+ if not selected_option:
# `allow_empty_input` parameter handles return of None on empty input, if necessary
# Otherwise raise `RequirementError`
if allow_empty_input:
@@ -469,21 +522,20 @@ def generic_select(options, input_text="Select one of the above by index or abso
raise RequirementError('Please select an option to continue')
# Replaced `isdigit` with` isnumeric` to discard all negative numbers
elif selected_option.isnumeric():
- selected_option = int(selected_option)
- if selected_option >= len(options):
+ if (selected_option := int(selected_option)) >= len(options):
raise RequirementError(f'Selected option "{selected_option}" is out of range')
selected_option = options[selected_option]
break
elif selected_option in options:
- break # We gave a correct absolute value
+ break # We gave a correct absolute value
else:
raise RequirementError(f'Selected option "{selected_option}" does not exist in available options')
except RequirementError as err:
log(f" * {err} * ", fg='red')
- continue
return selected_option
+
def select_disk(dict_o_disks):
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
@@ -499,47 +551,44 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
-
- log(f"You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
- drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ',
- options_output=False)
+
+ log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
+ drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ', options_output=False)
if not drive:
return drive
-
+
drive = dict_o_disks[drive]
return drive
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
-def select_profile(options):
- """
- Asks the user to select a profile from the `options` dictionary parameter.
- Usually this is combined with :ref:`archinstall.list_profiles`.
- :param options: A `dict` where keys are the profile name, value should be a dict containing profile information.
- :type options: dict
+def select_profile():
+ """
+ Asks the user to select a profile from the available profiles.
:return: The name/dictionary key of the selected profile
:rtype: str
"""
- profiles = sorted(list(options))
+ shown_profiles = sorted(list(list_profiles(filter_top_level_profiles=True)))
+ actual_profiles_raw = shown_profiles + sorted([profile for profile in list_profiles() if profile not in shown_profiles])
- if len(profiles) >= 1:
- for index, profile in enumerate(profiles):
+ if len(shown_profiles) >= 1:
+ for index, profile in enumerate(shown_profiles):
print(f"{index}: {profile}")
print(' -- The above list is a set of pre-programmed profiles. --')
print(' -- They might make it easier to install things like desktop environments. --')
print(' -- (Leave blank and hit enter to skip this step and continue) --')
- selected_profile = generic_select(profiles, 'Enter a pre-programmed profile name if you want to install one: ',
- options_output=False)
+ selected_profile = generic_select(actual_profiles_raw, 'Enter a pre-programmed profile name if you want to install one: ', options_output=False)
if selected_profile:
return Profile(None, selected_profile)
else:
raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
-def select_language(options, show_only_country_codes=True):
+
+def select_language(options, show_only_country_codes=True, input_text='Select one of the above keyboard languages (by number or full name): '):
"""
Asks the user to select a language from the `options` dictionary parameter.
Usually this is combined with :ref:`archinstall.list_keyboard_languages`.
@@ -553,24 +602,23 @@ def select_language(options, show_only_country_codes=True):
:return: The language/dictionary key of the selected language
:rtype: str
"""
- DEFAULT_KEYBOARD_LANGUAGE = 'us'
-
+ default_keyboard_language = 'us'
+
if show_only_country_codes:
languages = sorted([language for language in list(options) if len(language) == 2])
else:
languages = sorted(list(options))
if len(languages) >= 1:
- for index, language in enumerate(languages):
- print(f"{index}: {language}")
+ print_large_list(languages, margin_bottom=4)
print(" -- You can choose a layout that isn't in this list, but whose name you know --")
- print(" -- Also, you can enter '?' or 'help' to search for more languages, or skip to use US layout --")
+ print(f" -- Also, you can enter '?' or 'help' to search for more languages, or skip to use {default_keyboard_language} layout --")
while True:
- selected_language = input('Select one of the above keyboard languages (by name or full name): ')
+ selected_language = input(input_text)
if not selected_language:
- return DEFAULT_KEYBOARD_LANGUAGE
+ return default_keyboard_language
elif selected_language.lower() in ('?', 'help'):
while True:
filter_string = input("Search for layout containing (example: \"sv-\") or enter 'exit' to exit from search: ")
@@ -598,6 +646,7 @@ def select_language(options, show_only_country_codes=True):
raise RequirementError("Selecting languages require a least one language to be given as an option.")
+
def select_mirror_regions(mirrors, show_top_mirrors=True):
"""
Asks the user to select a mirror or region from the `mirrors` dictionary parameter.
@@ -621,8 +670,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
print_large_list(regions, margin_bottom=4)
print(' -- You can skip this step by leaving the option blank --')
- selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ',
- options_output=False)
+ selected_mirror = generic_select(regions, 'Select one of the above regions to download packages from (by number or full name): ', options_output=False)
if not selected_mirror:
# Returning back empty options which can be both used to
# do "if x:" logic as well as do `x.get('mirror', {}).get('sub', None)` chaining
@@ -639,20 +687,21 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
raise RequirementError("Selecting mirror region require a least one region to be given as an option.")
+
def select_driver(options=AVAILABLE_GFX_DRIVERS):
"""
- Some what convoluted function, which's job is simple.
+ Some what convoluted function, whose job is simple.
Select a graphics driver from a pre-defined set of popular options.
(The template xorg is for beginner users, not advanced, and should
there for appeal to the general public first and edge cases later)
"""
-
+
drivers = sorted(list(options))
-
+ default_option = options["All open-source (default)"]
+
if drivers:
- lspci = sys_command(f'/usr/bin/lspci')
- for line in lspci.trace_log.split(b'\r\n'):
+ for line in SysCommand('/usr/bin/lspci'):
if b' vga ' in line.lower():
if b'nvidia' in line.lower():
print(' ** nvidia card detected, suggested driver: nvidia **')
@@ -660,13 +709,16 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
print(' ** AMD card detected, suggested driver: AMD / ATI **')
initial_option = generic_select(drivers, input_text="Select your graphics card driver: ")
+
+ if not initial_option:
+ return default_option
+
selected_driver = options[initial_option]
if type(selected_driver) == dict:
driver_options = sorted(list(selected_driver))
- driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ',
- allow_empty_input=False)
+ driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ', allow_empty_input=False)
driver_package_group = selected_driver[driver_package_group]
return driver_package_group
@@ -675,6 +727,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
+
def select_kernel(options):
"""
Asks the user to select a kernel for system.
@@ -685,12 +738,12 @@ def select_kernel(options):
:return: The string as a selected kernel
:rtype: string
"""
-
- DEFAULT_KERNEL = "linux"
-
+
+ default_kernel = "linux"
+
kernels = sorted(list(options))
-
+
if kernels:
- return generic_multi_select(kernels, f"Choose which kernel to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL)
-
+ return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {default_kernel}): ", default=default_kernel, sort=False)
+
raise RequirementError("Selecting kernels require a least one kernel to be given as an option.")