1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import logging
import os
import socket
import ssl
import struct
from typing import Union, Dict, Any, List, Optional
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import urlopen
from .exceptions import HardwareIncompatibilityError, SysCallError
from .general import SysCommand
from .output import log
from .pacman import run_pacman
from .storage import storage
def get_hw_addr(ifname :str) -> str:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(ifname, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]:
interfaces = {}
for index, iface in socket.if_nameindex():
if skip_loopback and iface == "lo":
continue
mac = get_hw_addr(iface).replace(':', '-').lower()
interfaces[mac] = iface
return interfaces
def check_mirror_reachable() -> bool:
log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO)
try:
run_pacman("-Sy")
return True
except SysCallError as err:
if os.geteuid() != 0:
log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red")
log(f'exit_code: {err.exit_code}, Error: {err.message}', level=logging.DEBUG)
return False
def update_keyring() -> bool:
log("Updating archlinux-keyring ...", level=logging.INFO)
try:
run_pacman("-Sy --noconfirm archlinux-keyring")
return True
except SysCallError:
if os.geteuid() != 0:
log("update_keyring() uses 'pacman -Sy archlinux-keyring' which requires root.", level=logging.ERROR, fg="red")
return False
def enrich_iface_types(interfaces: Union[Dict[str, Any], List[str]]) -> Dict[str, str]:
result = {}
for iface in interfaces:
if os.path.isdir(f"/sys/class/net/{iface}/bridge/"):
result[iface] = 'BRIDGE'
elif os.path.isfile(f"/sys/class/net/{iface}/tun_flags"):
# ethtool -i {iface}
result[iface] = 'TUN/TAP'
elif os.path.isdir(f"/sys/class/net/{iface}/device"):
if os.path.isdir(f"/sys/class/net/{iface}/wireless/"):
result[iface] = 'WIRELESS'
else:
result[iface] = 'PHYSICAL'
else:
result[iface] = 'UNKNOWN'
return result
def wireless_scan(interface :str) -> None:
interfaces = enrich_iface_types(list(list_interfaces().values()))
if interfaces[interface] != 'WIRELESS':
raise HardwareIncompatibilityError(f"Interface {interface} is not a wireless interface: {interfaces}")
try:
SysCommand(f"iwctl station {interface} scan")
except SysCallError as error:
raise SystemError(f"Could not scan for wireless networks: {error}")
if '_WIFI' not in storage:
storage['_WIFI'] = {}
if interface not in storage['_WIFI']:
storage['_WIFI'][interface] = {}
storage['_WIFI'][interface]['scanning'] = True
# TODO: Full WiFi experience might get evolved in the future, pausing for now 2021-01-25
def get_wireless_networks(interface :str) -> None:
# TODO: Make this oneliner pritter to check if the interface is scanning or not.
# TODO: Rename this to list_wireless_networks() as it doesn't return anything
if '_WIFI' not in storage or interface not in storage['_WIFI'] or storage['_WIFI'][interface].get('scanning', False) is False:
import time
wireless_scan(interface)
time.sleep(5)
for line in SysCommand(f"iwctl station {interface} get-networks"):
print(line)
def fetch_data_from_url(url: str, params: Optional[Dict] = None) -> str:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
if params is not None:
encoded = urlencode(params)
full_url = f'{url}?{encoded}'
else:
full_url = url
try:
response = urlopen(full_url, context=ssl_context)
data = response.read().decode('UTF-8')
return data
except URLError:
raise ValueError(f'Unable to fetch data from url: {url}')
|