Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--archinstall/lib/disk.py25
-rw-r--r--archinstall/lib/exceptions.py20
-rw-r--r--archinstall/lib/general.py21
-rw-r--r--archinstall/lib/hardware.py11
-rw-r--r--archinstall/lib/installer.py42
-rw-r--r--archinstall/lib/locale_helpers.py4
-rw-r--r--archinstall/lib/luks.py20
-rw-r--r--archinstall/lib/mirrors.py14
-rw-r--r--archinstall/lib/networking.py22
-rw-r--r--archinstall/lib/output.py34
-rw-r--r--archinstall/lib/packages.py17
-rw-r--r--archinstall/lib/profiles.py24
-rw-r--r--archinstall/lib/services.py4
-rw-r--r--archinstall/lib/storage.py2
-rw-r--r--archinstall/lib/systemd.py4
-rw-r--r--archinstall/lib/user_interaction.py110
-rw-r--r--examples/guided.py4
17 files changed, 230 insertions, 148 deletions
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py
index fd08ea63..0a0337ec 100644
--- a/archinstall/lib/disk.py
+++ b/archinstall/lib/disk.py
@@ -1,12 +1,11 @@
-from typing import Optional
-import glob, re, os, json, time, hashlib
-import pathlib, traceback, logging
+import glob
+import pathlib
+import re
from collections import OrderedDict
-from .exceptions import DiskError
+
from .general import *
-from .output import log
-from .storage import storage
from .hardware import hasUEFI
+from .output import log
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GPT = 0b00000001
@@ -172,7 +171,7 @@ class Partition():
self.mount(mountpoint)
mount_information = get_mount_info(self.path)
-
+
if self.mountpoint != mount_information.get('target', None) and mountpoint:
raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
@@ -250,14 +249,14 @@ class Partition():
def has_content(self):
if not get_filesystem_type(self.path):
return False
-
+
temporary_mountpoint = '/tmp/'+hashlib.md5(bytes(f"{time.time()}", 'UTF-8')+os.urandom(12)).hexdigest()
temporary_path = pathlib.Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
if (handle := sys_command(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
-
+
files = len(glob.glob(f"{temporary_mountpoint}/*"))
sys_command(f'/usr/bin/umount {temporary_mountpoint}')
@@ -385,7 +384,7 @@ class Partition():
sys_command(f'/usr/bin/mount {self.path} {target}')
except SysCallError as err:
raise err
-
+
self.mountpoint = target
return True
@@ -446,7 +445,7 @@ class Filesystem():
raise DiskError(f'Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos')
else:
raise DiskError(f'Unknown mode selected to format in: {self.mode}')
-
+
# TODO: partition_table_type is hardcoded to GPT at the moment. This has to be changed.
elif self.mode == self.blockdevice.partition_table_type:
log(f'Kept partition format {self.mode} for {self.blockdevice}', level=logging.DEBUG)
@@ -513,7 +512,7 @@ class Filesystem():
def add_partition(self, type, start, end, format=None):
log(f'Adding partition to {self.blockdevice}', level=logging.INFO)
-
+
previous_partitions = self.blockdevice.partitions
if self.mode == MBR:
if len(self.blockdevice.partitions)>3:
@@ -632,4 +631,4 @@ def disk_layouts():
return json.loads(b''.join(handle).decode('UTF-8'))
except SysCallError as err:
log(f"Could not return disk layouts: {err}")
- return None \ No newline at end of file
+ return None
diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py
index 49913980..6837f582 100644
--- a/archinstall/lib/exceptions.py
+++ b/archinstall/lib/exceptions.py
@@ -1,23 +1,41 @@
class RequirementError(BaseException):
pass
+
+
class DiskError(BaseException):
pass
+
+
class UnknownFilesystemFormat(BaseException):
pass
+
+
class ProfileError(BaseException):
pass
+
+
class SysCallError(BaseException):
def __init__(self, message, exit_code):
super(SysCallError, self).__init__(message)
self.message = message
self.exit_code = exit_code
+
+
class ProfileNotFound(BaseException):
pass
+
+
class HardwareIncompatibilityError(BaseException):
pass
+
+
class PermissionError(BaseException):
pass
+
+
class UserError(BaseException):
pass
+
+
class ServiceException(BaseException):
- pass \ No newline at end of file
+ pass
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 72f8677f..2b27ac4c 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,11 +1,18 @@
-import os, json, hashlib, shlex, sys
-import time, pty, logging
+import hashlib
+import json
+import logging
+import os
+import pty
+import shlex
+import sys
+import time
from datetime import datetime, date
-from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
+from typing import Union
+
from .exceptions import *
from .output import log
-from typing import Optional, Union
+
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
@@ -37,16 +44,16 @@ class JSON_Encoder:
if isinstance(obj, dict):
## We'll need to iterate not just the value that default() usually gets passed
## But also iterate manually over each key: value pair in order to trap the keys.
-
+
copy = {}
for key, val in list(obj.items()):
if isinstance(val, dict):
val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
- # But it's the only quick way I can think of to
+ # But it's the only quick way I can think of to
# trigger a encoding of sub-dictionaries.
else:
val = JSON_Encoder._encode(val)
-
+
if type(key) == str and key[0] == '!':
copy[JSON_Encoder._encode(key)] = '******'
else:
diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py
index 009a3a6c..e4f87a0c 100644
--- a/archinstall/lib/hardware.py
+++ b/archinstall/lib/hardware.py
@@ -1,8 +1,11 @@
-import os, subprocess, json
-from .general import sys_command
-from .networking import list_interfaces, enrichIfaceTypes
+import json
+import os
+import subprocess
from typing import Optional
+from .general import sys_command
+from .networking import list_interfaces, enrich_iface_types
+
__packages__ = [
"mesa",
"xf86-video-amdgpu",
@@ -53,7 +56,7 @@ AVAILABLE_GFX_DRIVERS = {
}
def hasWifi()->bool:
- return 'WIRELESS' in enrichIfaceTypes(list_interfaces().values()).values()
+ return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values()
def hasAMDCPU()->bool:
if subprocess.check_output("lscpu | grep AMD", shell=True).strip().decode():
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 68d058f0..ba92d519 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -1,15 +1,11 @@
-import os, stat, time, shutil, pathlib
-import subprocess, logging
-from .exceptions import *
from .disk import *
-from .general import *
-from .user_interaction import *
-from .profiles import Profile
+from .hardware import *
from .mirrors import *
-from .systemd import Networkd
from .output import log
+from .profiles import Profile
from .storage import storage
-from .hardware import *
+from .systemd import Networkd
+from .user_interaction import *
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
@@ -47,7 +43,7 @@ class Installer():
'base' : False,
'bootloader' : False
}
-
+
self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages
for kernel in kernels:
self.base_packages.append(kernel)
@@ -100,10 +96,10 @@ class Installer():
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
for step in missing_steps:
self.log(f' - {step}', fg='red', level=logging.WARNING)
-
+
self.log(f"Detailed error logs can be found at: {storage['LOG_PATH']}", level=logging.WARNING)
self.log(f"Submit this zip file as an issue to https://github.com/archlinux/archinstall/issues", level=logging.WARNING)
-
+
self.sync_log_to_install_medium()
return False
@@ -116,7 +112,7 @@ class Installer():
if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"):
os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}")
-
+
shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}")
return True
@@ -124,7 +120,7 @@ class Installer():
def mount(self, partition, mountpoint, create_mountpoint=True):
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
-
+
partition.mount(f'{self.target}{mountpoint}')
def post_install_check(self, *args, **kwargs):
@@ -147,7 +143,7 @@ class Installer():
def genfstab(self, flags='-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
-
+
fstab = sys_command(f'/usr/bin/genfstab {flags} {self.target}').trace_log
with open(f"{self.target}/etc/fstab", 'ab') as fstab_fh:
fstab_fh.write(fstab)
@@ -204,7 +200,7 @@ class Installer():
def arch_chroot(self, cmd, *args, **kwargs):
if 'runas' in kwargs:
cmd = f"su - {kwargs['runas']} -c \"{cmd}\""
-
+
return self.run_command(cmd)
def drop_to_shell(self):
@@ -224,7 +220,7 @@ class Installer():
network["DNS"] = dns
conf = Networkd(Match={"Name": nic}, Network=network)
-
+
with open(f"{self.target}/etc/systemd/network/10-{nic}.network", "a") as netconf:
netconf.write(str(conf))
@@ -272,7 +268,7 @@ class Installer():
# Otherwise, we can go ahead and enable the services
else:
self.enable_service('systemd-networkd', 'systemd-resolved')
-
+
return True
@@ -281,7 +277,7 @@ class Installer():
return partition
elif partition.parent not in partition.path and Partition(partition.parent, None, autodetect_filesystem=True).filesystem == 'crypto_LUKS':
return Partition(partition.parent, None, autodetect_filesystem=True)
-
+
return False
def mkinitcpio(self, *flags):
@@ -298,7 +294,7 @@ class Installer():
## TODO: Perhaps this should be living in the function which dictates
## the partitioning. Leaving here for now.
-
+
for partition in self.partitions:
if partition.filesystem == 'btrfs':
@@ -322,7 +318,7 @@ class Installer():
if not(hasUEFI()):
self.base_packages.append('grub')
-
+
if not isVM():
vendor = cpuVendor()
if vendor == "AuthenticAMD":
@@ -331,7 +327,7 @@ class Installer():
self.base_packages.append("intel-ucode")
else:
self.log("Unknown cpu vendor not installing ucode")
-
+
self.pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
@@ -395,7 +391,7 @@ class Installer():
f"default {self.init_time}",
f"timeout 5"
]
-
+
with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
for line in loader_data:
if line[:8] == 'default ':
@@ -500,7 +496,7 @@ class Installer():
o = b''.join(sys_command(f"/usr/bin/arch-chroot {self.target} sh -c \"echo '{user}:{password}' | chpasswd\""))
pass
-
+
def user_set_shell(self, user, shell):
self.log(f'Setting shell for {user} to {shell}', level=logging.INFO)
diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py
index 3c373bc6..daf67e5b 100644
--- a/archinstall/lib/locale_helpers.py
+++ b/archinstall/lib/locale_helpers.py
@@ -4,6 +4,7 @@ import os
from .exceptions import *
# from .general import sys_command
+
def list_keyboard_languages():
locale_dir = '/usr/share/kbd/keymaps/'
@@ -16,16 +17,19 @@ def list_keyboard_languages():
if os.path.splitext(file)[1] == '.gz':
yield file.strip('.gz').strip('.map')
+
def verify_keyboard_layout(layout):
for language in list_keyboard_languages():
if layout.lower() == language.lower():
return True
return False
+
def search_keyboard_layout(filter):
for language in list_keyboard_languages():
if filter.lower() in language.lower():
yield language
+
def set_keyboard_language(locale):
return subprocess.call(['loadkeys', locale]) == 0
diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py
index 7f8485e6..e6e1c897 100644
--- a/archinstall/lib/luks.py
+++ b/archinstall/lib/luks.py
@@ -1,13 +1,9 @@
-import os
-import shlex
-import time
import pathlib
-import logging
-from .exceptions import *
-from .general import *
+
from .disk import Partition
+from .general import *
from .output import log
-from .storage import storage
+
class luks2():
def __init__(self, partition, mountpoint, password, key_file=None, auto_unmount=False, *args, **kwargs):
@@ -22,12 +18,12 @@ class luks2():
self.mapdev = None
def __enter__(self):
- #if self.partition.allow_formatting:
- # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
- #else:
+ # if self.partition.allow_formatting:
+ # self.key_file = self.encrypt(self.partition, *self.args, **self.kwargs)
+ # else:
if not self.key_file:
self.key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw" # TODO: Make disk-pw-file randomly unique?
-
+
if type(self.password) != bytes:
self.password = bytes(self.password, 'UTF-8')
@@ -112,7 +108,7 @@ class luks2():
if cmd_handle.exit_code != 0:
raise DiskError(f'Could not encrypt volume "{partition.path}": {cmd_output}')
-
+
return key_file
def unlock(self, partition, mountpoint, key_file):
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index ae6c6422..dd8fadc4 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,9 +1,8 @@
-import urllib.request, logging
+import urllib.request
-from .exceptions import *
from .general import *
from .output import log
-from .storage import storage
+
def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tmp_dir='/root', *args, **kwargs):
"""
@@ -19,9 +18,10 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', tm
o = b''.join(sys_command((f"/usr/bin/wget 'https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O {tmp_dir}/mirrorlist")))
o = b''.join(sys_command((f"/usr/bin/sed -i 's/#Server/Server/' {tmp_dir}/mirrorlist")))
o = b''.join(sys_command((f"/usr/bin/mv {tmp_dir}/mirrorlist {destination}")))
-
+
return True
+
def add_custom_mirrors(mirrors:list, *args, **kwargs):
"""
This will append custom mirror definitions in pacman.conf
@@ -37,6 +37,7 @@ def add_custom_mirrors(mirrors:list, *args, **kwargs):
return True
+
def insert_mirrors(mirrors, *args, **kwargs):
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
@@ -58,6 +59,7 @@ def insert_mirrors(mirrors, *args, **kwargs):
return True
+
def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
for region, mirrors in regions.items():
@@ -67,11 +69,13 @@ def use_mirrors(regions :dict, destination='/etc/pacman.d/mirrorlist'):
mirrorlist.write(f'Server = {mirror}\n')
return True
+
def re_rank_mirrors(top=10, *positionals, **kwargs):
if sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0:
return True
return False
+
def list_mirrors():
url = f"https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on"
regions = {}
@@ -97,4 +101,4 @@ def list_mirrors():
url = line.lstrip('#Server = ')
regions[region][url] = True
- return regions \ No newline at end of file
+ return regions
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py
index 2dc8be9b..768cc1cc 100644
--- a/archinstall/lib/networking.py
+++ b/archinstall/lib/networking.py
@@ -7,22 +7,25 @@ from .exceptions import *
from .general import sys_command
from .storage import storage
-def getHwAddr(ifname):
+
+def get_hw_addr(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
-
+
+
def list_interfaces(skip_loopback=True):
interfaces = OrderedDict()
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
- mac = getHwAddr(iface).replace(':', '-').lower()
+ mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
return interfaces
-def enrichIfaceTypes(interfaces :dict):
+
+def enrich_iface_types(interfaces :dict):
result = {}
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
@@ -39,11 +42,13 @@ def enrichIfaceTypes(interfaces :dict):
result[iface] = 'UNKNOWN'
return result
+
def get_interface_from_mac(mac):
return list_interfaces().get(mac.lower(), None)
-def wirelessScan(interface):
- interfaces = enrichIfaceTypes(list_interfaces().values())
+
+def wireless_scan(interface):
+ interfaces = enrich_iface_types(list_interfaces().values())
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
@@ -56,12 +61,13 @@ def wirelessScan(interface):
storage['_WIFI'][interface]['scanning'] = True
+
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
-def getWirelessNetworks(interface):
+def get_wireless_networks(interface):
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
if not '_WIFI' in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
- wirelessScan(interface)
+ wireless_scan(interface)
time.sleep(5)
for line in sys_command(f"iwctl station {interface} get-networks"):
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index d6a197f1..0818aed0 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -5,16 +5,18 @@ import logging
from pathlib import Path
from .storage import storage
+
# TODO: use logging's built in levels instead.
# Although logging is threaded and I wish to avoid that.
# It's more Pythonistic or w/e you want to call it.
-class LOG_LEVELS:
+class LogLevels:
Critical = 0b001
Error = 0b010
Warning = 0b011
Info = 0b101
Debug = 0b111
+
class journald(dict):
@abc.abstractmethod
def log(message, level=logging.DEBUG):
@@ -27,19 +29,19 @@ class journald(dict):
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
- if level == LOG_LEVELS.Critical:
+ if level == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.CRITICAL
- elif level == LOG_LEVELS.Error:
+ elif level == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.ERROR
- elif level == LOG_LEVELS.Warning:
+ elif level == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.WARNING
- elif level == LOG_LEVELS.Info:
+ elif level == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.INFO
- elif level == LOG_LEVELS.Debug:
+ elif level == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
level = logging.DEBUG
@@ -49,14 +51,16 @@ class journald(dict):
log_ch.setFormatter(log_fmt)
log_adapter.addHandler(log_ch)
log_adapter.setLevel(logging.DEBUG)
-
+
log_adapter.log(level, message)
+
# TODO: Replace log() for session based logging.
-class SessionLogging():
+class SessionLogging:
def __init__(self):
pass
+
# Found first reference here: https://stackoverflow.com/questions/7445658/how-to-detect-if-the-console-does-support-ansi-escape-codes-in-python
# And re-used this: https://github.com/django/django/blob/master/django/core/management/color.py#L12
def supports_color():
@@ -70,6 +74,7 @@ def supports_color():
is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
return supported_platform and is_a_tty
+
# Heavily influenced by: https://github.com/django/django/blob/ae8338daf34fd746771e0678081999b656177bae/django/utils/termcolors.py#L13
# Color options here: https://askubuntu.com/questions/528928/how-to-do-underline-bold-italic-strikethrough-color-background-and-size-i
def stylize_output(text :str, *opts, **kwargs):
@@ -94,6 +99,7 @@ def stylize_output(text :str, *opts, **kwargs):
text = '%s\x1b[%sm' % (text or '', RESET)
return '%s%s' % (('\x1b[%sm' % ';'.join(code_list)), text or '')
+
def log(*args, **kwargs):
string = orig_string = ' '.join([str(x) for x in args])
@@ -132,19 +138,19 @@ def log(*args, **kwargs):
# to logging levels (and warn about deprecated usage)
# There's some code re-usage here but that should be fine.
# TODO: Remove these in a few versions:
- if kwargs['level'] == LOG_LEVELS.Critical:
+ if kwargs['level'] == LogLevels.Critical:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.CRITICAL
- elif kwargs['level'] == LOG_LEVELS.Error:
+ elif kwargs['level'] == LogLevels.Error:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.ERROR
- elif kwargs['level'] == LOG_LEVELS.Warning:
+ elif kwargs['level'] == LogLevels.Warning:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.WARNING
- elif kwargs['level'] == LOG_LEVELS.Info:
+ elif kwargs['level'] == LogLevels.Info:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.INFO
- elif kwargs['level'] == LOG_LEVELS.Debug:
+ elif kwargs['level'] == LogLevels.Debug:
log("Deprecated level detected in log message, please use new logging.<level> instead for the following log message:", fg="red", level=logging.ERROR, force=True)
kwargs['level'] = logging.DEBUG
@@ -156,7 +162,7 @@ def log(*args, **kwargs):
try:
journald.log(string, level=kwargs.get('level', logging.INFO))
except ModuleNotFoundError:
- pass # Ignore writing to journald
+ pass # Ignore writing to journald
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and
diff --git a/archinstall/lib/packages.py b/archinstall/lib/packages.py
index 4f6b6c61..87c60abb 100644
--- a/archinstall/lib/packages.py
+++ b/archinstall/lib/packages.py
@@ -1,10 +1,14 @@
-import urllib.request, urllib.parse
-import ssl, json
+import json
+import ssl
+import urllib.parse
+import urllib.request
+
from .exceptions import *
BASE_URL = 'https://archlinux.org/packages/search/json/?name={package}'
BASE_GROUP_URL = 'https://archlinux.org/groups/x86_64/{group}/'
+
def find_group(name):
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
@@ -16,11 +20,12 @@ def find_group(name):
return False
else:
raise err
-
+
# Just to be sure some code didn't slip through the exception
if response.code == 200:
return True
+
def find_package(name):
"""
Finds a specific package via the package database.
@@ -33,6 +38,7 @@ def find_package(name):
data = response.read().decode('UTF-8')
return json.loads(data)
+
def find_packages(*names):
"""
This function returns the search results for many packages.
@@ -44,6 +50,7 @@ def find_packages(*names):
result[package] = find_package(package)
return result
+
def validate_package_list(packages :list):
"""
Validates a list of given packages.
@@ -53,8 +60,8 @@ def validate_package_list(packages :list):
for package in packages:
if not find_package(package)['results'] and not find_group(package):
invalid_packages.append(package)
-
+
if invalid_packages:
raise RequirementError(f"Invalid package names: {invalid_packages}")
- return True \ No newline at end of file
+ return True
diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py
index 42fd4c24..ebcd3aff 100644
--- a/archinstall/lib/profiles.py
+++ b/archinstall/lib/profiles.py
@@ -1,13 +1,17 @@
+import hashlib
+import importlib.util
+import json
+import re
+import ssl
+import sys
+import urllib.parse
+import urllib.request
from typing import Optional
-import os, urllib.request, urllib.parse, ssl, json, re
-import importlib.util, sys, glob, hashlib, logging
-from collections import OrderedDict
-from .general import multisplit, sys_command
-from .exceptions import *
+from .general import multisplit
from .networking import *
-from .output import log
from .storage import storage
+
def grab_url_data(path):
safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
ssl_context = ssl.create_default_context()
@@ -16,6 +20,7 @@ def grab_url_data(path):
response = urllib.request.urlopen(safe_path, context=ssl_context)
return response.read()
+
def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_profiles=False):
# TODO: Grab from github page as well, not just local static files
if filter_irrelevant_macs:
@@ -55,7 +60,7 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
except json.decoder.JSONDecodeError as err:
print(f'Error: Could not decode "{profiles_url}" result as JSON:', err)
return cache
-
+
for profile in profile_list:
if os.path.splitext(profile)[1] == '.py':
tailored = False
@@ -73,7 +78,8 @@ def list_profiles(filter_irrelevant_macs=True, subpath='', filter_top_level_prof
return cache
-class Script():
+
+class Script:
def __init__(self, profile, installer=None):
# profile: https://hvornum.se/something.py
# profile: desktop
@@ -154,6 +160,7 @@ class Script():
return sys.modules[self.namespace]
+
class Profile(Script):
def __init__(self, installer, path, args={}):
super(Profile, self).__init__(path, installer)
@@ -238,6 +245,7 @@ class Profile(Script):
return imported.__packages__
return None
+
class Application(Profile):
def __repr__(self, *args, **kwargs):
return f'Application({os.path.basename(self.profile)})'
diff --git a/archinstall/lib/services.py b/archinstall/lib/services.py
index bb6f64f2..46aa7846 100644
--- a/archinstall/lib/services.py
+++ b/archinstall/lib/services.py
@@ -1,8 +1,6 @@
-import os
-
-from .exceptions import *
from .general import *
+
def service_state(service_name: str):
if os.path.splitext(service_name)[1] != '.service':
service_name += '.service' # Just to be safe
diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py
index 43d088bb..d985ca17 100644
--- a/archinstall/lib/storage.py
+++ b/archinstall/lib/storage.py
@@ -12,7 +12,7 @@ storage = {
'./profiles',
'~/.config/archinstall/profiles',
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'profiles'),
- #os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
+ # os.path.abspath(f'{os.path.dirname(__file__)}/../examples')
],
'UPSTREAM_URL' : 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles',
'PROFILE_DB' : None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing.
diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py
index f2b7c9b3..5607250b 100644
--- a/archinstall/lib/systemd.py
+++ b/archinstall/lib/systemd.py
@@ -1,4 +1,4 @@
-class Ini():
+class Ini:
def __init__(self, *args, **kwargs):
"""
Limited INI handler for now.
@@ -25,11 +25,13 @@ class Ini():
return result
+
class Systemd(Ini):
"""
Placeholder class to do systemd specific setups.
"""
+
class Networkd(Systemd):
"""
Placeholder class to do systemd-network specific setups.
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index c76dc9a5..0aeba3b9 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -1,27 +1,40 @@
-import getpass, pathlib, os, shutil, re, time
-import sys, time, signal, ipaddress, logging
-import termios, tty, select # Used for char by char polling of sys.stdin
+import getpass
+import ipaddress
+import logging
+import pathlib
+import re
+import select # Used for char by char polling of sys.stdin
+import shutil
+import signal
+import sys
+import termios
+import time
+import tty
+
from .exceptions import *
-from .profiles import Profile
-from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
-from .output import log
-from .storage import storage
-from .networking import list_interfaces
from .general import sys_command
from .hardware import AVAILABLE_GFX_DRIVERS, hasUEFI
+from .locale_helpers import list_keyboard_languages, verify_keyboard_layout, search_keyboard_layout
+from .networking import list_interfaces
+from .output import log
+from .profiles import Profile
-## TODO: Some inconsistencies between the selection processes.
-## Some return the keys from the options, some the values?
+
+# TODO: Some inconsistencies between the selection processes.
+# Some return the keys from the options, some the values?
def get_terminal_height():
return shutil.get_terminal_size().lines
+
def get_terminal_width():
return shutil.get_terminal_size().columns
+
def get_longest_option(options):
return max([len(x) for x in options])
+
def check_for_correct_username(username):
if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32:
return True
@@ -32,6 +45,7 @@ def check_for_correct_username(username):
)
return False
+
def do_countdown():
SIG_TRIGGER = False
def kill_handler(sig, frame):
@@ -67,6 +81,7 @@ def do_countdown():
signal.signal(signal.SIGINT, original_sigint_handler)
return True
+
def get_password(prompt="Enter a password: "):
while (passwd := getpass.getpass(prompt)):
passwd_verification = getpass.getpass(prompt='And one more time for verification: ')
@@ -80,6 +95,7 @@ def get_password(prompt="Enter a password: "):
return passwd
return None
+
def print_large_list(options, padding=5, margin_bottom=0, separator=': '):
highest_index_number_length = len(str(len(options)))
longest_line = highest_index_number_length + len(separator) + get_longest_option(options) + padding
@@ -140,7 +156,7 @@ def generic_multi_select(options, text="Select one or more of the options above
section.input_pos = section._cursor_x
selected_option = section.get_keyboard_input(end=None)
# This string check is necessary to correct work with it
- # Without this, Python will raise AttributeError because of stripping `None`
+ # Without this, Python will raise AttributeError because of stripping `None`
# It also allows to remove empty spaces if the user accidentally entered them.
if isinstance(selected_option, str):
selected_option = selected_option.strip()
@@ -173,7 +189,7 @@ def generic_multi_select(options, text="Select one or more of the options above
return selected_options
-class MiniCurses():
+class MiniCurses:
def __init__(self, width, height):
self.width = width
self.height = height
@@ -200,10 +216,10 @@ class MiniCurses():
if x < 0: x = 0
if y < 0: y = 0
- #import time
- #sys.stdout.write(f"Clearing from: {x, y}")
- #sys.stdout.flush()
- #time.sleep(2)
+ # import time
+ # sys.stdout.write(f"Clearing from: {x, y}")
+ # sys.stdout.flush()
+ # time.sleep(2)
sys.stdout.flush()
sys.stdout.write('\033[%d;%df' % (y, x))
@@ -259,16 +275,16 @@ class MiniCurses():
poller.register(sys.stdin.fileno(), select.EPOLLIN)
- EOF = False
- while EOF is False:
+ eof = False
+ while eof is False:
for fileno, event in poller.poll(0.025):
char = sys.stdin.read(1)
- #sys.stdout.write(f"{[char]}")
- #sys.stdout.flush()
+ # sys.stdout.write(f"{[char]}")
+ # sys.stdout.flush()
- if (newline := (char in ('\n', '\r'))):
- EOF = True
+ if newline := (char in ('\n', '\r')):
+ eof = True
if not newline or strip_rowbreaks is False:
response += char
@@ -287,6 +303,7 @@ class MiniCurses():
if response:
return response
+
def ask_for_superuser_account(prompt='Username for required superuser with sudo privileges: ', forced=False):
while 1:
new_user = input(prompt).strip(' ')
@@ -304,6 +321,7 @@ def ask_for_superuser_account(prompt='Username for required superuser with sudo
password = get_password(prompt=f'Password for user {new_user}: ')
return {new_user: {"!password" : password}}
+
def ask_for_additional_users(prompt='Any additional users to install (leave blank for no users): '):
users = {}
superusers = {}
@@ -315,7 +333,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
if not check_for_correct_username(new_user):
continue
password = get_password(prompt=f'Password for user {new_user}: ')
-
+
if input("Should this user be a superuser (sudoer) [y/N]: ").strip(' ').lower() in ('y', 'yes'):
superusers[new_user] = {"!password" : password}
else:
@@ -323,6 +341,7 @@ def ask_for_additional_users(prompt='Any additional users to install (leave blan
return users, superusers
+
def ask_for_a_timezone():
while True:
timezone = input('Enter a valid timezone (examples: Europe/Stockholm, US/Eastern) or press enter to use UTC: ').strip().strip('*.')
@@ -337,6 +356,7 @@ def ask_for_a_timezone():
fg='red'
)
+
def ask_for_bootloader() -> str:
bootloader = "systemd-bootctl"
if hasUEFI()==False:
@@ -347,6 +367,7 @@ def ask_for_bootloader() -> str:
bootloader="grub-install"
return bootloader
+
def ask_for_audio_selection():
audio = "pulseaudio" # Default for most desktop environments
pipewire_choice = input("Would you like to install pipewire instead of pulseaudio as the default audio server? [Y/n] ").lower()
@@ -355,6 +376,7 @@ def ask_for_audio_selection():
return audio
+
def ask_to_configure_network():
# Optionally configure one network interface.
#while 1:
@@ -422,6 +444,7 @@ def ask_to_configure_network():
return {}
+
def ask_for_disk_layout():
options = {
'keep-existing' : 'Keep existing partition layout and select which ones to use where',
@@ -433,6 +456,7 @@ def ask_for_disk_layout():
allow_empty_input=False, sort=True)
return next((key for key, val in options.items() if val == value), None)
+
def ask_for_main_filesystem_format():
options = {
'btrfs' : 'btrfs',
@@ -445,6 +469,7 @@ def ask_for_main_filesystem_format():
allow_empty_input=False)
return next((key for key, val in options.items() if val == value), None)
+
def generic_select(options, input_text="Select one of the above by index or absolute value: ", allow_empty_input=True, options_output=True, sort=False):
"""
A generic select function that does not output anything
@@ -477,7 +502,6 @@ def generic_select(options, input_text="Select one of the above by index or abso
# As we pass only list and dict (converted to list), we can skip converting to list
options = sorted(options)
-
# Added ability to disable the output of options items,
# if another function displays something different from this
if options_output:
@@ -510,6 +534,7 @@ def generic_select(options, input_text="Select one of the above by index or abso
return selected_option
+
def select_disk(dict_o_disks):
"""
Asks the user to select a harddrive from the `dict_o_disks` selection.
@@ -525,18 +550,18 @@ def select_disk(dict_o_disks):
if len(drives) >= 1:
for index, drive in enumerate(drives):
print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})")
-
+
log(f"You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", fg="yellow")
- drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ',
- options_output=False)
+ drive = generic_select(drives, 'Select one of the above disks (by name or number) or leave blank to use /mnt: ', options_output=False)
if not drive:
return drive
-
+
drive = dict_o_disks[drive]
return drive
raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.')
+
def select_profile(options):
"""
Asks the user to select a profile from the `options` dictionary parameter.
@@ -565,6 +590,7 @@ def select_profile(options):
else:
raise RequirementError("Selecting profiles require a least one profile to be given as an option.")
+
def select_language(options, show_only_country_codes=True):
"""
Asks the user to select a language from the `options` dictionary parameter.
@@ -579,8 +605,8 @@ def select_language(options, show_only_country_codes=True):
:return: The language/dictionary key of the selected language
:rtype: str
"""
- DEFAULT_KEYBOARD_LANGUAGE = 'us'
-
+ default_keyboard_language = 'us'
+
if show_only_country_codes:
languages = sorted([language for language in list(options) if len(language) == 2])
else:
@@ -596,7 +622,7 @@ def select_language(options, show_only_country_codes=True):
while True:
selected_language = input('Select one of the above keyboard languages (by name or full name): ')
if not selected_language:
- return DEFAULT_KEYBOARD_LANGUAGE
+ return default_keyboard_language
elif selected_language.lower() in ('?', 'help'):
while True:
filter_string = input("Search for layout containing (example: \"sv-\") or enter 'exit' to exit from search: ")
@@ -624,6 +650,7 @@ def select_language(options, show_only_country_codes=True):
raise RequirementError("Selecting languages require a least one language to be given as an option.")
+
def select_mirror_regions(mirrors, show_top_mirrors=True):
"""
Asks the user to select a mirror or region from the `mirrors` dictionary parameter.
@@ -665,6 +692,7 @@ def select_mirror_regions(mirrors, show_top_mirrors=True):
raise RequirementError("Selecting mirror region require a least one region to be given as an option.")
+
def select_driver(options=AVAILABLE_GFX_DRIVERS):
"""
Some what convoluted function, which's job is simple.
@@ -673,10 +701,10 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
(The template xorg is for beginner users, not advanced, and should
there for appeal to the general public first and edge cases later)
"""
-
+
drivers = sorted(list(options))
default_option = options["All open-source (default)"]
-
+
if drivers:
lspci = sys_command(f'/usr/bin/lspci')
for line in lspci.trace_log.split(b'\r\n'):
@@ -696,8 +724,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
if type(selected_driver) == dict:
driver_options = sorted(list(selected_driver))
- driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ',
- allow_empty_input=False)
+ driver_package_group = generic_select(driver_options, f'Which driver-type do you want for {initial_option}: ', allow_empty_input=False)
driver_package_group = selected_driver[driver_package_group]
return driver_package_group
@@ -706,6 +733,7 @@ def select_driver(options=AVAILABLE_GFX_DRIVERS):
raise RequirementError("Selecting drivers require a least one profile to be given as an option.")
+
def select_kernel(options):
"""
Asks the user to select a kernel for system.
@@ -716,12 +744,12 @@ def select_kernel(options):
:return: The string as a selected kernel
:rtype: string
"""
-
- DEFAULT_KERNEL = "linux"
-
+
+ default_kernel = "linux"
+
kernels = sorted(list(options))
-
+
if kernels:
- return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {DEFAULT_KERNEL}): ", default=DEFAULT_KERNEL, sort=False)
-
+ return generic_multi_select(kernels, f"Choose which kernels to use (leave blank for default: {default_kernel}): ", default=default_kernel, sort=False)
+
raise RequirementError("Selecting kernels require a least one kernel to be given as an option.")
diff --git a/examples/guided.py b/examples/guided.py
index 95ccca29..a4fb5e3b 100644
--- a/examples/guided.py
+++ b/examples/guided.py
@@ -10,7 +10,7 @@ if archinstall.arguments.get('help'):
exit(0)
# For support reasons, we'll log the disk layout pre installation to match against post-installation layout
-archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=archinstall.LOG_LEVELS.Debug)
+archinstall.log(f"Disk states before installing: {archinstall.disk_layouts()}", level=archinstall.LogLevels.Debug)
def ask_user_questions():
@@ -387,7 +387,7 @@ def perform_installation(mountpoint):
pass
# For support reasons, we'll log the disk layout post installation (crash or no crash)
- archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=archinstall.LOG_LEVELS.Debug)
+ archinstall.log(f"Disk states after installing: {archinstall.disk_layouts()}", level=archinstall.LogLevels.Debug)
ask_user_questions()
perform_installation_steps()