Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds@gmail.com>2021-05-12 14:45:04 +0200
committerAnton Hvornum <anton.feeds@gmail.com>2021-05-12 14:45:04 +0200
commit12dc55650d78dc41bbc0ab7013baa7b3ce61ec7c (patch)
tree945dd48af6c32e03f10aa14186149e92f2a120a1 /archinstall/lib
parent1708f1850d5d620e7a44ab4da4a9c5c028f5008b (diff)
parentdf6c4e77f721da2b03a510548d281992b25987b2 (diff)
Merge branch 'master' of github.com:Torxed/archinstall into torxed-fix-350
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk.py14
-rw-r--r--archinstall/lib/general.py9
-rw-r--r--archinstall/lib/hardware.py58
-rw-r--r--archinstall/lib/installer.py25
-rw-r--r--archinstall/lib/output.py2
-rw-r--r--archinstall/lib/profiles.py58
-rw-r--r--archinstall/lib/user_interaction.py113
7 files changed, 159 insertions, 120 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index ff924f62..fd08ea63 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,3 +1,4 @@
+from typing import Optional
import glob, re, os, json, time, hashlib
import pathlib, traceback, logging
from collections import OrderedDict
@@ -205,7 +206,7 @@ class Partition():
return f'Partition(path={self.path}, size={self.size}, fs={self.filesystem}{mount_repr})'
@property
- def uuid(self) -> str:
+ def uuid(self) -> Optional[str]:
"""
Returns the PARTUUID as returned by lsblk.
This is more reliable than relying on /dev/disk/by-partuuid as
@@ -214,7 +215,7 @@ class Partition():
lsblk = b''.join(sys_command(f'lsblk -J -o+PARTUUID {self.path}'))
for partition in json.loads(lsblk.decode('UTF-8'))['blockdevices']:
return partition.get('partuuid', None)
-
+ return None
@property
def encrypted(self):
return self._encrypted
@@ -471,7 +472,6 @@ class Filesystem():
def raw_parted(self, string:str):
x = sys_command(f'/usr/bin/parted -s {string}')
- log(f"'parted -s {string}' returned: {b''.join(x)}", level=logging.DEBUG)
return x
def parted(self, string:str):
@@ -625,3 +625,11 @@ def get_filesystem_type(path):
return b''.join(handle).strip().decode('UTF-8')
except SysCallError:
return None
+
+def disk_layouts():
+ try:
+ handle = sys_command(f"lsblk -f -o+TYPE,SIZE -J")
+ return json.loads(b''.join(handle).decode('UTF-8'))
+ except SysCallError as err:
+ log(f"Could not return disk layouts: {err}")
+ return None \ No newline at end of file
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index eb0c5d14..72f8677f 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -5,6 +5,7 @@ from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
from .exceptions import *
from .output import log
+from typing import Optional, Union
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
@@ -160,16 +161,15 @@ class sys_command():#Thread):
'exit_code': self.exit_code
}
- def peak(self, output :str):
+ def peak(self, output : Union[str, bytes]) -> bool:
if type(output) == bytes:
try:
output = output.decode('UTF-8')
except UnicodeDecodeError:
- return None
-
+ return False
output = output.strip('\r\n ')
if len(output) <= 0:
- return None
+ return False
if self.peak_output:
from .user_interaction import get_terminal_width
@@ -191,6 +191,7 @@ class sys_command():#Thread):
# And print the new output we're peaking on:
sys.stdout.write(output)
sys.stdout.flush()
+ return True
def run(self):
self.status = 'running'
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index d6cf982c..009a3a6c 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -3,22 +3,53 @@ from .general import sys_command
from .networking import list_interfaces, enrichIfaceTypes
from typing import Optional
+__packages__ = [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "xf86-video-nouveau",
+ "xf86-video-vmware",
+ "libva-mesa-driver",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-radeon",
+ "vulkan-intel",
+ "nvidia",
+]
+
AVAILABLE_GFX_DRIVERS = {
# Sub-dicts are layer-2 options to be selected
# and lists are a list of packages to be installed
- 'AMD / ATI' : {
- 'amd' : ['xf86-video-amdgpu'],
- 'ati' : ['xf86-video-ati']
- },
- 'intel' : ['xf86-video-intel'],
- 'nvidia' : {
- 'open-source' : ['xf86-video-nouveau'],
- 'proprietary' : ['nvidia']
+ "All open-source (default)": [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "xf86-video-nouveau",
+ "xf86-video-vmware",
+ "libva-mesa-driver",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-radeon",
+ "vulkan-intel",
+ ],
+ "AMD / ATI (open-source)": [
+ "mesa",
+ "xf86-video-amdgpu",
+ "xf86-video-ati",
+ "libva-mesa-driver",
+ "vulkan-radeon",
+ ],
+ "Intel (open-source)": [
+ "mesa",
+ "libva-intel-driver",
+ "intel-media-driver",
+ "vulkan-intel",
+ ],
+ "Nvidia": {
+ "open-source": ["mesa", "xf86-video-nouveau", "libva-mesa-driver"],
+ "proprietary": ["nvidia"],
},
- 'mesa' : ['mesa'],
- 'fbdev' : ['xf86-video-fbdev'],
- 'vesa' : ['xf86-video-vesa'],
- 'vmware' : ['xf86-video-vmware']
+ "VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"],
}
def hasWifi()->bool:
@@ -60,10 +91,11 @@ def cpuVendor()-> Optional[str]:
if info.get('field',None):
if info.get('field',None) == "Vendor ID:":
return info.get('data',None)
+ return None
def isVM() -> bool:
try:
- subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a none 0 exit code if it is not on a virtual machine
+ subprocess.check_call(["systemd-detect-virt"]) # systemd-detect-virt issues a non-zero exit code if it is not on a virtual machine
return True
except:
return False
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 236b69d8..284a4e36 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -12,8 +12,7 @@ from .storage import storage
from .hardware import *
# Any package that the Installer() is responsible for (optional and the default ones)
-__packages__ = ["base", "base-devel", "linux", "linux-firmware", "efibootmgr", "nano", "ntp", "iwd"]
-__base_packages__ = __packages__[:6]
+__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
class Installer():
"""
@@ -39,8 +38,7 @@ class Installer():
:type hostname: str, optional
"""
- def __init__(self, target, *, base_packages='base base-devel linux-firmware', kernels='linux'):
- kernels = kernels.split(",")
+ def __init__(self, target, *, base_packages=__packages__[:3], kernels=['linux']):
self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
@@ -53,10 +51,7 @@ class Installer():
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in kernels:
self.base_packages.append(kernel)
- if hasUEFI():
- self.base_packages.append("efibootmgr")
- else:
- self.base_packages.append("grub")
+
self.post_base_install = []
storage['session'] = self
@@ -201,6 +196,9 @@ class Installer():
return sys_command(f'/usr/bin/arch-chroot {self.target} {cmd}')
def arch_chroot(self, cmd, *args, **kwargs):
+ if 'runas' in kwargs:
+ cmd = f"su - {kwargs['runas']} -c \"{cmd}\""
+
return self.run_command(cmd)
def drop_to_shell(self):
@@ -364,12 +362,12 @@ class Installer():
boot_partition = partition
elif partition.mountpoint == self.target:
root_partition = partition
- if hasUEFI():
- self.log(f'Adding bootloader {bootloader} to {boot_partition}', level=logging.INFO)
- else:
- self.log(f'Adding bootloader {bootloader} to {root_partition}', level=logging.INFO)
+
+ self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl':
+ self.pacstrap('efibootmgr')
+
if not hasUEFI():
raise HardwareIncompatibilityError
# TODO: Ideally we would want to check if another config
@@ -432,7 +430,10 @@ class Installer():
raise RequirementError(f"Could not identify the UUID of {self.partition}, there for {self.target}/boot/loader/entries/arch.conf will be broken until fixed.")
elif bootloader == "grub-install":
+ self.pacstrap('grub')
+
if hasUEFI():
+ self.pacstrap('efibootmgr')
o = b''.join(sys_command(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB'))
sys_command('/usr/bin/arch-chroot /mnt grub-mkconfig -o /boot/grub/grub.cfg')
return True
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index 06d99778..d6a197f1 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -19,7 +19,7 @@ class journald(dict):
@abc.abstractmethod
def log(message, level=logging.DEBUG):
try:
- import systemd.journal
+ import systemd.journal # type: ignore
except ModuleNotFoundError:
return False
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index ad5d3bac..1feba1cd 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,3 +1,4 @@
+from typing import Optional
import os, urllib.request, urllib.parse, ssl, json, re
import importlib.util, sys, glob, hashlib, logging
from collections import OrderedDict
@@ -49,7 +50,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
except urllib.error.HTTPError as err:
print(f'Error: Listing profiles on URL "{profiles_url}" resulted in:', err)
return cache
- except:
+ except json.decoder.JSONDecodeError as err:
print(f'Error: Could not decode "{profiles_url}" result as JSON:', err)
return cache
@@ -92,6 +93,9 @@ class Script():
if len(args) >= 2 and args[1]:
raise args[1]
+ if self.original_namespace:
+ self.namespace = self.original_namespace
+
def localize_path(self, profile_path):
if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'):
if not self.converted_path:
@@ -202,54 +206,17 @@ class Profile(Script):
with open(self.path, 'r') as source:
source_data = source.read()
- # TODO: I imagine that there is probably a better way to write this.
- return 'top_level_profile = True' in source_data
-
- @property
- def packages(self) -> list:
- """
- Returns a list of packages baked into the profile definition.
- If no package definition has been done, .packages() will return None.
- """
- with open(self.path, 'r') as source:
- source_data = source.read()
-
- # Some crude safety checks, make sure the imported profile has
- # a __name__ check before importing.
- #
- # If the requirements are met, import with .py in the namespace to not
- # trigger a traditional:
- # if __name__ == 'moduleName'
- if '__name__' in source_data and '__packages__' in source_data:
+ if '__name__' in source_data and 'is_top_level_profile' in source_data:
with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
- if hasattr(imported, '__packages__'):
- return imported.__packages__
- return None
-
+ if hasattr(imported, 'is_top_level_profile'):
+ return imported.is_top_level_profile
- def has_post_install(self):
- with open(self.path, 'r') as source:
- source_data = source.read()
-
- # Some crude safety checks, make sure the imported profile has
- # a __name__ check and if so, check if it's got a _prep_function()
- # we can call to ask for more user input.
- #
- # If the requirements are met, import with .py in the namespace to not
- # trigger a traditional:
- # if __name__ == 'moduleName'
- if '__name__' in source_data and '_post_install' in source_data:
- with self.load_instructions(namespace=f"{self.namespace}.py") as imported:
- if hasattr(imported, '_post_install'):
- return True
-
- def is_top_level_profile(self):
- with open(self.path, 'r') as source:
- source_data = source.read()
- return 'top_level_profile = True' in source_data
+ # Default to True if nothing is specified,
+ # since developers like less code - omitting it should assume they want to present it.
+ return True
@property
- def packages(self) -> list:
+ def packages(self) -> Optional[list]:
"""
Returns a list of packages baked into the profile definition.
If no package definition has been done, .packages() will return None.
@@ -268,7 +235,6 @@ class Profile(Script):
if hasattr(imported, '__packages__'):
return imported.__packages__
return None
-
class Application(Profile):
def __repr__(self, *args, **kwargs):
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 451251cd..c76dc9a5 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -83,16 +83,18 @@ def get_password(prompt="Enter a password: "):
def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
+ spaces_without_option = longest_line - (len(separator) + highest_index_number_length)
max_num_of_columns = get_terminal_width() // longest_line
max_options_in_cells = max_num_of_columns * (get_terminal_height()-margin_bottom)
if (len(options) > max_options_in_cells):
for index, option in enumerate(options):
print(f"{index}: {option}")
+ return 1, index
else:
for row in range(0, (get_terminal_height()-margin_bottom)):
for column in range(row, len(options), (get_terminal_height()-margin_bottom)):
- spaces = " "*(longest_line - len(options[column]))
+ spaces = " "*(spaces_without_option - len(options[column]))
print(f"{str(column): >{highest_index_number_length}}{separator}{options[column]}", end = spaces)
print()
@@ -100,6 +102,18 @@ def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
def generic_multi_select(options, text="Select one or more of the options above (leave blank to continue): ", sort=True, default=None, allow_empty=False):
+ # Checking if the options are different from `list` or `dict` or if they are empty
+ if type(options) not in [list, dict]:
+ log(f" * Generic multi-select doesn't support ({type(options)}) as type of options * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError("generic_multi_select() requires list or dictionary as options.")
+ if not options:
+ log(f" * Generic multi-select didn't find any options to choose from * ", fg='red')
+ log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
+ raise RequirementError('generic_multi_select() requires at least one option to proceed.')
+ # After passing the checks, function continues to work
+ if type(options) == dict:
+ options = list(options.values())
if sort:
options = sorted(options)
@@ -108,7 +122,7 @@ def generic_multi_select(options, text="Select one or more of the options above
selected_options = []
while True:
- if len(selected_options) <= 0 and default and default in options:
+ if not selected_options and default in options:
selected_options.append(default)
printed_options = []
@@ -119,32 +133,42 @@ def generic_multi_select(options, text="Select one or more of the options above
printed_options.append(f'{option}')
section.clear(0, get_terminal_height()-section._cursor_y-1)
- x, y = print_large_list(printed_options, margin_bottom=2)
+ print_large_list(printed_options, margin_bottom=2)
section._cursor_y = len(printed_options)
section._cursor_x = 0
section.write_line(text)
section.input_pos = section._cursor_x
selected_option = section.get_keyboard_input(end=None)
-
- if selected_option is None:
- if len(selected_options) <= 0 and default:
- selected_options = [default]
-
- if len(selected_options) or allow_empty is True:
- break
- else:
- log('* Need to select at least one option!', fg='red')
- continue
-
- elif selected_option.isdigit():
- if (selected_option := int(selected_option)) >= len(options):
- log('* Option is out of range, please select another one!', fg='red')
- continue
- selected_option = options[selected_option]
- if selected_option in selected_options:
- selected_options.remove(selected_option)
+ # This string check is necessary to correct work with it
+ # Without this, Python will raise AttributeError because of stripping `None`
+ # It also allows to remove empty spaces if the user accidentally entered them.
+ if isinstance(selected_option, str):
+ selected_option = selected_option.strip()
+ try:
+ if not selected_option:
+ if not selected_options and default:
+ selected_options = [default]
+ elif selected_options or allow_empty:
+ break
+ else:
+ raise RequirementError('Please select at least one option to continue')
+ elif selected_option.isnumeric():
+ if (selected_option := int(selected_option)) >= len(options):
+ raise RequirementError(f'Selected option "{selected_option}" is out of range')
+ selected_option = options[selected_option]
+ if selected_option in selected_options:
+ selected_options.remove(selected_option)
+ else:
+ selected_options.append(selected_option)
+ elif selected_option in options:
+ if selected_option in selected_options:
+ selected_options.remove(selected_option)
+ else:
+ selected_options.append(selected_option)
else:
- selected_options.append(selected_option)
+ raise RequirementError(f'Selected option "{selected_option}" does not exist in available options')
+ except RequirementError as e:
+ log(f" * {e} * ", fg='red')
return selected_options
@@ -263,14 +287,14 @@ class MiniCurses():
if response:
return response
-def ask_for_superuser_account(prompt='Username for required super-user with sudo privileges: ', forced=False):
+def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
if not new_user and forced:
# TODO: make this text more generic?
# It's only used to create the first sudo user when root is disabled in guided.py
- log(' * Since root is disabled, you need to create a least one (super) user!', fg='red')
+ log(' * Since root is disabled, you need to create a least one superuser!', fg='red')
continue
elif not new_user and not forced:
raise UserError("No superuser was created.")
@@ -282,7 +306,7 @@ def ask_for_superuser_account(prompt='Username for required super-user with sudo
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
users = {}
- super_users = {}
+ superusers = {}
while 1:
new_user = input(prompt).strip(' ')
@@ -292,12 +316,12 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
continue
password = get_password(prompt=f'Password for user {new_user}: ')
- if input("Should this user be a sudo (super) user (y/N): ").strip(' ').lower() in ('y', 'yes'):
- super_users[new_user] = {"!password" : password}
+ if input("Should this user be a superuser (sudoer) [y/N]: ").strip(' ').lower() in ('y', 'yes'):
+ superusers[new_user] = {"!password" : password}
else:
users[new_user] = {"!password" : password}
- return users, super_users
+ return users, superusers
def ask_for_a_timezone():
while True:
@@ -435,20 +459,24 @@ def generic_select(options, input_text="Select one of the above by index or abso
this function returns an item from list, a string, or None
"""
- # Checking if options are different from `list` or `dict`
+ # Checking if the options are different from `list` or `dict` or if they are empty
if type(options) not in [list, dict]:
log(f" * Generic select doesn't support ({type(options)}) as type of options * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError("generic_select() requires list or dictionary as options.")
- # To allow only `list` and `dict`, converting values of options here.
- # Therefore, now we can only provide the dictionary itself
- if type(options) == dict: options = list(options.values())
- if sort: options = sorted(options) # As we pass only list and dict (converted to list), we can skip converting to list
- if len(options) == 0:
+ if not options:
log(f" * Generic select didn't find any options to choose from * ", fg='red')
log(" * If problem persists, please create an issue on https://github.com/archlinux/archinstall/issues * ", fg='yellow')
raise RequirementError('generic_select() requires at least one option to proceed.')
-
+ # After passing the checks, function continues to work
+ if type(options) == dict:
+ # To allow only `list` and `dict`, converting values of options here.
+ # Therefore, now we can only provide the dictionary itself
+ options = list(options.values())
+ if sort:
+ # As we pass only list and dict (converted to list), we can skip converting to list
+ options = sorted(options)
+
# Added ability to disable the output of options items,
# if another function displays something different from this
@@ -460,8 +488,8 @@ def generic_select(options, input_text="Select one of the above by index or abso
# Now the try...except block handles validation for invalid input from the user
while True:
try:
- selected_option = input(input_text)
- if len(selected_option.strip()) == 0:
+ selected_option = input(input_text).strip()
+ if not selected_option:
# `allow_empty_input` parameter handles return of None on empty input, if necessary
# Otherwise raise `RequirementError`
if allow_empty_input:
@@ -469,8 +497,7 @@ def generic_select(options, input_text="Select one of the above by index or abso
raise RequirementError('Please select an option to continue')
# Replaced `isdigit` with` isnumeric` to discard all negative numbers
elif selected_option.isnumeric():
- selected_option = int(selected_option)
- if selected_option >= len(options):
+ if (selected_option := int(selected_option)) >= len(options):
raise RequirementError(f'Selected option "{selected_option}" is out of range')
selected_option = options[selected_option]
break
@@ -480,7 +507,6 @@ def generic_select(options, input_text="Select one of the above by index or abso
raise RequirementError(f'Selected option "{selected_option}" does not exist in available options')
except RequirementError as err:
log(f" * {err} * ", fg='red')
- continue
return selected_option
@@ -649,6 +675,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
"""
drivers = sorted(list(options))
+ default_option = options["All open-source (default)"]
if drivers:
lspci = sys_command(f'/usr/bin/lspci')
@@ -660,6 +687,10 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
print(' ** AMD card detected, suggested driver: AMD / ATI **')
initial_option = generic_select(drivers, input_text="Select your graphics card driver: ")
+
+ if not initial_option:
+ return default_option
+
selected_driver = options[initial_option]
if type(selected_driver) == dict:
@@ -691,6 +722,6 @@ def select_kernel(options):
kernels = sorted(list(options))
if kernels:
- return generic_multi_select(kernels, f"Choose which kernel to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL)
+ return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL, sort=False)
raise RequirementError("Selecting kernels require a least one kernel to be given as an option.")