Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/__init__.py')
-rw-r--r--archinstall/__init__.py318
1 files changed, 175 insertions, 143 deletions
diff --git a/archinstall/__init__.py b/archinstall/__init__.py
index 9de4a3ec..1c980390 100644
--- a/archinstall/__init__.py
+++ b/archinstall/__init__.py
@@ -1,59 +1,70 @@
"""Arch Linux installer - guided, templates etc."""
-import typing
+import importlib
+import os
+import sys
+import time
+import traceback
from argparse import ArgumentParser, Namespace
-
-from .lib.disk import *
-from .lib.exceptions import *
-from .lib.general import *
-from .lib.hardware import *
-from .lib.installer import __packages__, Installer, accessibility_tools_in_use
-from .lib.locale_helpers import *
-from .lib.luks import *
-from .lib.mirrors import *
-from .lib.models.network_configuration import NetworkConfigurationHandler
-from .lib.models.users import User
-from .lib.networking import *
-from .lib.output import *
-from .lib.models.dataclasses import (
- VersionDef,
- PackageSearchResult,
- PackageSearch,
- LocalPackage
-)
-from .lib.packages.packages import (
- group_search,
- package_search,
- find_package,
- find_packages,
- installed_package,
- validate_package_list,
-)
-from .lib.profiles import *
-from .lib.services import *
-from .lib.storage import *
-from .lib.systemd import *
-from .lib.user_interaction import *
-from .lib.menu import Menu
-from .lib.menu.list_manager import ListManager
-from .lib.menu.text_input import TextInput
-from .lib.menu.global_menu import GlobalMenu
-from .lib.menu.abstract_menu import (
- Selector,
- AbstractMenu
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Dict, Union
+
+from .lib import disk
+from .lib import menu
+from .lib import models
+from .lib import packages
+from .lib import exceptions
+from .lib import luks
+from .lib import locale
+from .lib import mirrors
+from .lib import networking
+from .lib import profile
+from .lib import interactions
+from . import default_profiles
+
+from .lib.hardware import SysInfo, GfxDriver
+from .lib.installer import Installer, accessibility_tools_in_use
+from .lib.output import FormattedOutput, log, error, debug, warn, info
+from .lib.pacman import Pacman
+from .lib.storage import storage
+from .lib.global_menu import GlobalMenu
+from .lib.boot import Boot
+from .lib.translationhandler import TranslationHandler, Language, DeferredTranslation
+from .lib.plugins import plugins, load_plugin
+from .lib.configuration import ConfigurationOutput
+
+from .lib.general import (
+ generate_password, locate_binary, clear_vt100_escape_codes,
+ JSON, UNSAFE_JSON, SysCommandWorker, SysCommand,
+ run_custom_user_commands, json_stream_to_structure, secret
)
-from .lib.translationhandler import TranslationHandler, DeferredTranslation
-from .lib.plugins import plugins, load_plugin # This initiates the plugin loading ceremony
-from .lib.configuration import *
-from .lib.udev import udevadm_info
-parser = ArgumentParser()
-__version__ = "2.5.4"
+if TYPE_CHECKING:
+ _: Any
+
+
+__version__ = "2.8.0"
storage['__version__'] = __version__
-# add the custome _ as a builtin, it can now be used anywhere in the
+# add the custom _ as a builtin, it can now be used anywhere in the
# project to mark strings as translatable with _('translate me')
DeferredTranslation.install()
+# Log various information about hardware before starting the installation. This might assist in troubleshooting
+debug(f"Hardware model detected: {SysInfo.sys_vendor()} {SysInfo.product_name()}; UEFI mode: {SysInfo.has_uefi()}")
+debug(f"Processor model detected: {SysInfo.cpu_model()}")
+debug(f"Memory statistics: {SysInfo.mem_available()} available out of {SysInfo.mem_total()} total installed")
+debug(f"Virtualization detected: {SysInfo.virtualization()}; is VM: {SysInfo.is_vm()}")
+debug(f"Graphics devices detected: {SysInfo._graphics_devices().keys()}")
+
+# For support reasons, we'll log the disk layout pre installation to match against post-installation layout
+debug(f"Disk states before installing: {disk.disk_layouts()}")
+
+if 'sphinx' not in sys.modules and os.getuid() != 0:
+ print(_("Archinstall requires root privileges to run. See --help for more."))
+ exit(1)
+
+parser = ArgumentParser()
+
def define_arguments():
"""
@@ -66,20 +77,25 @@ def define_arguments():
parser.add_argument("-v", "--version", action="version", version="%(prog)s " + __version__)
parser.add_argument("--config", nargs="?", help="JSON configuration file or URL")
parser.add_argument("--creds", nargs="?", help="JSON credentials configuration file")
- parser.add_argument("--disk_layouts","--disk_layout","--disk-layouts","--disk-layout",nargs="?",
- help="JSON disk layout file")
parser.add_argument("--silent", action="store_true",
help="WARNING: Disables all prompts for input and confirmation. If no configuration is provided, this is ignored")
parser.add_argument("--dry-run", "--dry_run", action="store_true",
help="Generates a configuration file and then exits instead of performing an installation")
parser.add_argument("--script", default="guided", nargs="?", help="Script to run for installation", type=str)
- parser.add_argument("--mount-point","--mount_point", nargs="?", type=str, help="Define an alternate mount point for installation")
+ parser.add_argument("--mount-point", "--mount_point", nargs="?", type=str,
+ help="Define an alternate mount point for installation")
+ parser.add_argument("--skip-ntp", action="store_true", help="Disables NTP checks during installation", default=False)
parser.add_argument("--debug", action="store_true", default=False, help="Adds debug info into the log")
- parser.add_argument("--offline", action="store_true", default=False, help="Disabled online upstream services such as package search and key-ring auto update.")
- parser.add_argument("--no-pkg-lookups", action="store_true", default=False, help="Disabled package validation specifically prior to starting installation.")
+ parser.add_argument("--offline", action="store_true", default=False,
+ help="Disabled online upstream services such as package search and key-ring auto update.")
+ parser.add_argument("--no-pkg-lookups", action="store_true", default=False,
+ help="Disabled package validation specifically prior to starting installation.")
parser.add_argument("--plugin", nargs="?", type=str)
+ parser.add_argument("--skip-version-check", action="store_true",
+ help="Skip the version check when running archinstall")
+
-def parse_unspecified_argument_list(unknowns :list, multiple :bool = False, error :bool = False) -> dict:
+def parse_unspecified_argument_list(unknowns: list, multiple: bool = False, err: bool = False) -> dict:
"""We accept arguments not defined to the parser. (arguments "ad hoc").
Internally argparse return to us a list of words so we have to parse its contents, manually.
We accept following individual syntax for each argument
@@ -95,51 +111,52 @@ def parse_unspecified_argument_list(unknowns :list, multiple :bool = False, erro
argument value value ...
which isn't am error if multiple is specified
"""
- tmp_list = unknowns[:] # wastes a few bytes, but avoids any collateral effect of the destructive nature of the pop method()
+ tmp_list = unknowns[:] # wastes a few bytes, but avoids any collateral effect of the destructive nature of the pop method()
config = {}
key = None
last_key = None
while tmp_list:
- element = tmp_list.pop(0) # retrieve an element of the list
- if element.startswith('--'): # is an argument ?
- if '=' in element: # uses the arg=value syntax ?
+ element = tmp_list.pop(0) # retrieve an element of the list
+ if element.startswith('--'): # is an argument ?
+ if '=' in element: # uses the arg=value syntax ?
key, value = [x.strip() for x in element[2:].split('=', 1)]
config[key] = value
- last_key = key # for multiple handling
- key = None # we have the kwy value pair we need
+ last_key = key # for multiple handling
+ key = None # we have the kwy value pair we need
else:
key = element[2:]
- config[key] = True # every argument starts its lifecycle as boolean
+ config[key] = True # every argument starts its lifecycle as boolean
else:
if element == '=':
continue
if key:
config[key] = element
- last_key = key # multiple
+ last_key = key # multiple
key = None
else:
if multiple and last_key:
- if isinstance(config[last_key],str):
- config[last_key] = [config[last_key],element]
+ if isinstance(config[last_key], str):
+ config[last_key] = [config[last_key], element]
else:
config[last_key].append(element)
- elif error:
+ elif err:
raise ValueError(f"Entry {element} is not related to any argument")
else:
print(f" We ignore the entry {element} as it isn't related to any argument")
return config
-def cleanup_empty_args(args :typing.Union[Namespace, dict]) -> dict:
+
+def cleanup_empty_args(args: Union[Namespace, Dict]) -> Dict:
"""
Takes arguments (dictionary or argparse Namespace) and removes any
None values. This ensures clean mergers during dict.update(args)
"""
- if type(args) == Namespace:
+ if type(args) is Namespace:
args = vars(args)
clean_args = {}
for key, val in args.items():
- if type(val) == dict:
+ if isinstance(val, dict):
val = cleanup_empty_args(val)
if val is not None:
@@ -147,6 +164,7 @@ def cleanup_empty_args(args :typing.Union[Namespace, dict]) -> dict:
return clean_args
+
def get_arguments() -> Dict[str, Any]:
""" The handling of parameters from the command line
Is done on following steps:
@@ -161,7 +179,7 @@ def get_arguments() -> Dict[str, Any]:
3) Amend
Change whatever is needed on the configuration dictionary (it could be done in post_process_arguments but this ougth to be left to changes anywhere else in the code, not in the arguments dictionary
"""
- config = {}
+ config: Dict[str, Any] = {}
args, unknowns = parser.parse_known_args()
# preprocess the JSON files.
# TODO Expand the url access to the other JSON file arguments ?
@@ -174,15 +192,15 @@ def get_arguments() -> Dict[str, Any]:
exit(1)
# load the parameters. first the known, then the unknowns
- args = cleanup_empty_args(args)
- config.update(args)
+ clean_args = cleanup_empty_args(args)
+ config.update(clean_args)
config.update(parse_unspecified_argument_list(unknowns))
# amend the parameters (check internal consistency)
# Installation can't be silent if config is not passed
- if args.get('config') is None:
+ if clean_args.get('config') is None:
config["silent"] = False
else:
- config["silent"] = args.get('silent')
+ config["silent"] = clean_args.get('silent')
# avoiding a compatibility issue
if 'dry-run' in config:
@@ -190,60 +208,52 @@ def get_arguments() -> Dict[str, Any]:
return config
+
def load_config():
"""
refine and set some arguments. Formerly at the scripts
"""
from .lib.models import NetworkConfiguration
+ arguments['locale_config'] = locale.LocaleConfiguration.parse_arg(arguments)
+
if (archinstall_lang := arguments.get('archinstall-language', None)) is not None:
arguments['archinstall-language'] = TranslationHandler().get_language_by_name(archinstall_lang)
- if arguments.get('harddrives', None) is not None:
- if type(arguments['harddrives']) is str:
- arguments['harddrives'] = arguments['harddrives'].split(',')
- arguments['harddrives'] = [BlockDevice(BlockDev) for BlockDev in arguments['harddrives']]
- # Temporarily disabling keep_partitions if config file is loaded
- # Temporary workaround to make Desktop Environments work
+ if disk_config := arguments.get('disk_config', {}):
+ arguments['disk_config'] = disk.DiskLayoutConfiguration.parse_arg(disk_config)
- if arguments.get('profile', None) is not None:
- if type(arguments.get('profile', None)) is dict:
- arguments['profile'] = Profile(None, arguments.get('profile', None)['path'])
- else:
- arguments['profile'] = Profile(None, arguments.get('profile', None))
+ if profile_config := arguments.get('profile_config', None):
+ arguments['profile_config'] = profile.ProfileConfiguration.parse_arg(profile_config)
- storage['_desktop_profile'] = arguments.get('desktop-environment', None)
-
- if arguments.get('mirror-region', None) is not None:
- if type(arguments.get('mirror-region', None)) is dict:
- arguments['mirror-region'] = arguments.get('mirror-region', None)
- else:
- selected_region = arguments.get('mirror-region', None)
- arguments['mirror-region'] = {selected_region: list_mirrors()[selected_region]}
-
- arguments.setdefault('sys-language', 'en_US')
- arguments.setdefault('sys-encoding', 'utf-8')
-
- if arguments.get('gfx_driver', None) is not None:
- storage['gfx_driver_packages'] = AVAILABLE_GFX_DRIVERS.get(arguments.get('gfx_driver', None), None)
+ if mirror_config := arguments.get('mirror_config', None):
+ arguments['mirror_config'] = mirrors.MirrorConfiguration.parse_args(mirror_config)
if arguments.get('servers', None) is not None:
storage['_selected_servers'] = arguments.get('servers', None)
- if arguments.get('nic', None) is not None:
- handler = NetworkConfigurationHandler()
- handler.parse_arguments(arguments.get('nic'))
- arguments['nic'] = handler.configuration
+ if (net_config := arguments.get('network_config', None)) is not None:
+ config = NetworkConfiguration.parse_arg(net_config)
+ arguments['network_config'] = config
if arguments.get('!users', None) is not None or arguments.get('!superusers', None) is not None:
users = arguments.get('!users', None)
superusers = arguments.get('!superusers', None)
- arguments['!users'] = User.parse_arguments(users, superusers)
+ arguments['!users'] = models.User.parse_arguments(users, superusers)
+
+ if arguments.get('bootloader', None) is not None:
+ arguments['bootloader'] = models.Bootloader.from_arg(arguments['bootloader'])
- if arguments.get('disk_encryption', None) is not None and arguments.get('disk_layouts', None) is not None:
+ if arguments.get('uki') and not arguments['bootloader'].has_uki_support():
+ arguments['uki'] = False
+
+ if arguments.get('audio_config', None) is not None:
+ arguments['audio_config'] = models.AudioConfiguration.parse_arg(arguments['audio_config'])
+
+ if arguments.get('disk_encryption', None) is not None and disk_config is not None:
password = arguments.get('encryption_password', '')
- arguments['disk_encryption'] = DiskEncryption.parse_arg(
- arguments['disk_layouts'],
+ arguments['disk_encryption'] = disk.DiskEncryption.parse_arg(
+ arguments['disk_config'],
arguments['disk_encryption'],
password
)
@@ -251,62 +261,84 @@ def load_config():
def post_process_arguments(arguments):
storage['arguments'] = arguments
- if arguments.get('mount_point'):
- storage['MOUNT_POINT'] = arguments['mount_point']
+ if mountpoint := arguments.get('mount_point', None):
+ storage['MOUNT_POINT'] = Path(mountpoint)
if arguments.get('debug', False):
- log(f"Warning: --debug mode will write certain credentials to {storage['LOG_PATH']}/{storage['LOG_FILE']}!", fg="red", level=logging.WARNING)
+ warn(f"Warning: --debug mode will write certain credentials to {storage['LOG_PATH']}/{storage['LOG_FILE']}!")
if arguments.get('plugin', None):
- load_plugin(arguments['plugin'])
-
- if arguments.get('disk_layouts', None) is not None:
- layout_storage = {}
- if not json_stream_to_structure('--disk_layouts',arguments['disk_layouts'],layout_storage):
- exit(1)
- else:
- if arguments.get('harddrives') is None:
- arguments['harddrives'] = [disk for disk in layout_storage]
- # backward compatibility. Change partition.format for partition.wipe
- for disk in layout_storage:
- for i, partition in enumerate(layout_storage[disk].get('partitions',[])):
- if 'format' in partition:
- partition['wipe'] = partition['format']
- del partition['format']
- elif 'btrfs' in partition:
- partition['btrfs']['subvolumes'] = Subvolume.parse_arguments(partition['btrfs']['subvolumes'])
- arguments['disk_layouts'] = layout_storage
+ path = arguments['plugin']
+ load_plugin(path)
load_config()
define_arguments()
-arguments = get_arguments()
+arguments: Dict[str, Any] = get_arguments()
post_process_arguments(arguments)
+
# @archinstall.plugin decorator hook to programmatically add
-# plugins in runtime. Useful in profiles and other things.
+# plugins in runtime. Useful in profiles_bck and other things.
def plugin(f, *args, **kwargs):
plugins[f.__name__] = f
-def run_as_a_module():
+def _check_new_version():
+ info("Checking version...")
+
+ try:
+ Pacman.run("-Sy")
+ except Exception as e:
+ debug(f'Failed to perform version check: {e}')
+ info(f'Arch Linux mirrors are not reachable. Please check your internet connection')
+ exit(1)
+
+ upgrade = None
+
+ try:
+ upgrade = Pacman.run("-Qu archinstall").decode()
+ except Exception as e:
+ debug(f'Failed determine pacman version: {e}')
+
+ if upgrade:
+ text = f'New version available: {upgrade}'
+ info(text)
+ time.sleep(3)
+
+
+def main():
"""
- Since we're running this as a 'python -m archinstall' module OR
- a nuitka3 compiled version of the project.
- This function and the file __main__ acts as a entry point.
+ This can either be run as the compiled and installed application: python setup.py install
+ OR straight as a module: python -m archinstall
+ In any case we will be attempting to load the provided script to be run from the scripts/ folder
"""
+ if not arguments.get('skip_version_check', False):
+ _check_new_version()
- # Add another path for finding profiles, so that list_profiles() in Script() can find guided.py, unattended.py etc.
- storage['PROFILE_PATH'].append(os.path.abspath(f'{os.path.dirname(__file__)}/examples'))
- try:
- script = Script(arguments.get('script', None))
- except ProfileNotFound as err:
- print(f"Couldn't find file: {err}")
- sys.exit(1)
+ script = arguments.get('script', None)
+
+ if script is None:
+ print('No script to run provided')
- os.chdir(os.path.abspath(os.path.dirname(__file__)))
+ mod_name = f'archinstall.scripts.{script}'
+ # by loading the module we'll automatically run the script
+ importlib.import_module(mod_name)
+
+
+def run_as_a_module():
+ try:
+ main()
+ except Exception as e:
+ err = ''.join(traceback.format_exception(e))
+ error(err)
+
+ text = (
+ 'Archinstall experienced the above error. If you think this is a bug, please report it to\n'
+ 'https://github.com/archlinux/archinstall and include the log file "/var/log/archinstall/install.log".\n\n'
+ 'Hint: To extract the log from a live ISO \ncurl -F\'file=@/var/log/archinstall/install.log\' https://0x0.st\n'
+ )
- # Remove the example directory from the PROFILE_PATH, to avoid guided.py etc shows up in user input questions.
- storage['PROFILE_PATH'].pop()
- script.execute()
+ warn(text)
+ exit(1)