index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | archinstall/lib/networking.py | 88 |
diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 96e8f3a1..fb26bd3d 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -1,21 +1,22 @@ -import logging import os import socket +import ssl import struct -from typing import Union, Dict, Any, List +from typing import Union, Dict, Any, List, Optional +from urllib.error import URLError +from urllib.parse import urlencode +from urllib.request import urlopen -from .exceptions import HardwareIncompatibilityError, SysCallError -from .general import SysCommand -from .output import log -from .pacman import run_pacman -from .storage import storage +from .exceptions import SysCallError +from .output import error, info +from .pacman import Pacman def get_hw_addr(ifname :str) -> str: import fcntl s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) - return ':'.join('%02x' % b for b in info[18:24]) + ret = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15])) + return ':'.join('%02x' % b for b in ret[18:24]) def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: @@ -31,26 +32,14 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: return interfaces -def check_mirror_reachable() -> bool: - log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO) - try: - if run_pacman("-Sy").exit_code == 0: - return True - elif os.geteuid() != 0: - log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") - except SysCallError as err: - log(err, level=logging.DEBUG) - - return False - - def update_keyring() -> bool: - log("Updating archlinux-keyring ...", level=logging.INFO) - if run_pacman("-Sy --noconfirm archlinux-keyring").exit_code == 0: + info("Updating archlinux-keyring ...") + try: + Pacman.run("-Sy --noconfirm archlinux-keyring") return True - - elif os.geteuid() != 0: - log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red") + except SysCallError: + if os.geteuid() != 0: + error("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.") return False @@ -75,35 +64,20 @@ def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str return result -def get_interface_from_mac(mac :str) -> str: - return list_interfaces().get(mac.lower(), None) - +def fetch_data_from_url(url: str, params: Optional[Dict] = None) -> str: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE -def wireless_scan(interface :str) -> None: - interfaces = enrich_iface_types(list_interfaces().values()) - if interfaces[interface] != 'WIRELESS': - raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}") + if params is not None: + encoded = urlencode(params) + full_url = f'{url}?{encoded}' + else: + full_url = url - if not (output := SysCommand(f"iwctl station {interface} scan")).exit_code == 0: - raise SystemError(f"Could not scan for wireless networks: {output}") - - if '_WIFI' not in storage: - storage['_WIFI'] = {} - if interface not in storage['_WIFI']: - storage['_WIFI'][interface] = {} - - storage['_WIFI'][interface]['scanning'] = True - - -# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25 -def get_wireless_networks(interface :str) -> None: - # TODO: Make this oneliner pritter to check if the interface is scanning or not. - # TODO: Rename this to list_wireless_networks() as it doesn't return anything - if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False: - import time - - wireless_scan(interface) - time.sleep(5) - - for line in SysCommand(f"iwctl station {interface} get-networks"): - print(line) + try: + response = urlopen(full_url, context=ssl_context) + data = response.read().decode('UTF-8') + return data + except URLError: + raise ValueError(f'Unable to fetch data from url: {url}') |