index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/disk.py | 16 | ||||
-rw-r--r-- | archinstall/lib/general.py | 54 | ||||
-rw-r--r-- | archinstall/lib/hardware.py | 10 | ||||
-rw-r--r-- | archinstall/lib/mirrors.py | 73 | ||||
-rw-r--r-- | archinstall/lib/networking.py | 4 | ||||
-rw-r--r-- | archinstall/lib/user_interaction.py | 4 |
diff --git a/archinstall/lib/disk.py b/archinstall/lib/disk.py index d0e3f6a2..de39bafd 100644 --- a/archinstall/lib/disk.py +++ b/archinstall/lib/disk.py @@ -448,16 +448,16 @@ class Filesystem: if self.blockdevice.keep_partitions is False: log(f'Wiping {self.blockdevice} by using partition format {self.mode}', level=logging.DEBUG) if self.mode == GPT: - if self.raw_parted(f'{self.blockdevice.device} mklabel gpt').exit_code == 0: + if self.parted_mklabel(self.blockdevice.device, "gpt"): self.blockdevice.flush_cache() return self else: - raise DiskError('Problem setting the partition format to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') + raise DiskError('Problem setting the disk label type to GPT:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel gpt') elif self.mode == MBR: - if SysCommand(f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos').exit_code == 0: + if self.parted_mklabel(self.blockdevice.device, "msdos"): return self else: - raise DiskError('Problem setting the partition format to MBR:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') + raise DiskError('Problem setting the disk label type to msdos:', f'/usr/bin/parted -s {self.blockdevice.device} mklabel msdos') else: raise DiskError(f'Unknown mode selected to format in: {self.mode}') @@ -552,6 +552,14 @@ class Filesystem: def set(self, partition: int, string: str): return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 + def parted_mklabel(self, device: str, disk_label: str): + # Try to unmount devices before attempting to run mklabel + try: + SysCommand(f'bash -c "umount {device}?"') + except: + pass + return self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0 + def device_state(name, *args, **kwargs): # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index f72311c9..b9dc66ab 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -2,14 +2,41 @@ import hashlib import json import logging import os -import pty import shlex import subprocess import sys import time from datetime import datetime, date -from select import epoll, EPOLLIN, EPOLLHUP from typing import Union +try: + from select import epoll, EPOLLIN, EPOLLHUP +except: + import select + EPOLLIN = 0 + EPOLLHUP = 0 + class epoll(): + """ #!if windows + Create a epoll() implementation that simulates the epoll() behavior. + This so that the rest of the code doesn't need to worry weither we're using select() or epoll(). + """ + def __init__(self): + self.sockets = {} + self.monitoring = {} + + def unregister(self, fileno, *args, **kwargs): + try: + del(self.monitoring[fileno]) + except: + pass + + def register(self, fileno, *args, **kwargs): + self.monitoring[fileno] = True + + def poll(self, timeout=0.05, *args, **kwargs): + try: + return [[fileno, 1] for fileno in select.select(list(self.monitoring.keys()), [], [], timeout)[0]] + except OSError: + return [] from .exceptions import * from .output import log @@ -202,27 +229,6 @@ class SysCommandWorker: except UnicodeDecodeError: return False - output = output.strip('\r\n ') - if len(output) <= 0: - return False - - from .user_interaction import get_terminal_width - - # Move back to the beginning of the terminal - sys.stdout.flush() - sys.stdout.write("\033[%dG" % 0) - sys.stdout.flush() - - # Clear the line - sys.stdout.write(" " * get_terminal_width()) - sys.stdout.flush() - - # Move back to the beginning again - sys.stdout.flush() - sys.stdout.write("\033[%dG" % 0) - sys.stdout.flush() - - # And print the new output we're peaking on: sys.stdout.write(output) sys.stdout.flush() return True @@ -252,6 +258,8 @@ class SysCommandWorker: self.exit_code = 1 def execute(self) -> bool: + import pty + if (old_dir := os.getcwd()) != self.working_directory: os.chdir(self.working_directory) diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index 45e042db..a63155f5 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -48,10 +48,12 @@ AVAILABLE_GFX_DRIVERS = { "intel-media-driver", "vulkan-intel", ], - "Nvidia": { - "open-source": ["mesa", "xf86-video-nouveau", "libva-mesa-driver"], - "proprietary": ["nvidia"], - }, + "Nvidia (open-source)": [ + "mesa", + "xf86-video-nouveau", + "libva-mesa-driver" + ], + "Nvidia (proprietary)": ["nvidia"], "VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"], } diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index ccfc2808..95fb5ac6 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,11 +1,57 @@ import urllib.error import urllib.request +from typing import Union from .general import * from .output import log +def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: + """ + This function can sort /etc/pacman.d/mirrorlist according to the + mirror's URL prefix. By default places HTTPS before HTTP but it also + preserves the country/rank-order. + + This assumes /etc/pacman.d/mirrorlist looks like the following: + + ## Comment + Server = url + + or + + ## Comment + #Server = url + + But the Comments need to start with double-hashmarks to be distringuished + from server url definitions (commented or uncommented). + """ + comments_and_whitespaces = b"" + + categories = {key: [] for key in sort_order+["Unknown"]} + for line in raw_data.split(b"\n"): + if line[0:2] in (b'##', b''): + comments_and_whitespaces += line + b'\n' + elif line[:6].lower() == b'server' or line[:7].lower() == b'#server': + opening, url = line.split(b'=', 1) + opening, url = opening.strip(), url.strip() + if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories: + categories[category].append(comments_and_whitespaces) + categories[category].append(opening+b' = '+url+b'\n') + else: + categories["Unknown"].append(comments_and_whitespaces) + categories["Unknown"].append(opening+b' = '+url+b'\n') + + comments_and_whitespaces = b"" -def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', *args, **kwargs): + + new_raw_data = b'' + for category in sort_order+["Unknown"]: + for line in categories[category]: + new_raw_data += line + + return new_raw_data + + +def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', sort_order=["https", "http"], *args, **kwargs) -> Union[bool, bytes]: """ This function will change the active mirrors on the live medium by filtering which regions are active based on `regions`. @@ -16,12 +62,19 @@ def filter_mirrors_by_region(regions, destination='/etc/pacman.d/mirrorlist', *a region_list = [] for region in regions.split(','): region_list.append(f'country={region}') - response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'})) + response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'})) new_list = response.read().replace(b"#Server", b"Server") - with open(destination, "wb") as mirrorlist: - mirrorlist.write(new_list) - return True + if sort_order: + new_list = sort_mirrorlist(new_list, sort_order=sort_order) + + if destination: + with open(destination, "wb") as mirrorlist: + mirrorlist.write(new_list) + + return True + else: + return new_list.decode('UTF-8') def add_custom_mirrors(mirrors: list, *args, **kwargs): @@ -78,8 +131,8 @@ def re_rank_mirrors(top=10, *positionals, **kwargs): return False -def list_mirrors(): - url = "https://archlinux.org/mirrorlist/?protocol=https&ip_version=4&ip_version=6&use_mirror_status=on" +def list_mirrors(sort_order=["https", "http"]): + url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" regions = {} try: @@ -88,8 +141,12 @@ def list_mirrors(): log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="yellow") return regions + mirrorlist = response.read() + if sort_order: + mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order) + region = 'Unknown region' - for line in response.readlines(): + for line in mirrorlist.split(b'\n'): if len(line.strip()) == 0: continue diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 0643c9cf..64bcb58e 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -1,4 +1,3 @@ -import fcntl import logging import os import socket @@ -12,6 +11,7 @@ from .storage import storage def get_hw_addr(ifname): + import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) return ':'.join('%02x' % b for b in info[18:24]) @@ -31,7 +31,7 @@ def list_interfaces(skip_loopback=True): def check_mirror_reachable(): if (exit_code := SysCommand("pacman -Sy").exit_code) == 0: return True - elif exit_code == 256: + elif os.geteuid() != 0: log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") return False diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index b52267d9..07b675a4 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -7,9 +7,7 @@ import select # Used for char by char polling of sys.stdin import shutil import signal import sys -import termios import time -import tty from .exceptions import * from .general import SysCommand @@ -269,6 +267,8 @@ class MiniCurses: def get_keyboard_input(self, strip_rowbreaks=True, end='\n'): assert end in ['\r', '\n', None] + import termios + import tty poller = select.epoll() response = '' |