Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/general.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/general.py')
-rw-r--r--archinstall/lib/general.py67
1 files changed, 40 insertions, 27 deletions
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index 72f8677f..9f6de666 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -1,17 +1,25 @@
-import os, json, hashlib, shlex, sys
-import time, pty, logging
+import hashlib
+import json
+import logging
+import os
+import pty
+import shlex
+import sys
+import time
from datetime import datetime, date
-from subprocess import Popen, STDOUT, PIPE, check_output
from select import epoll, EPOLLIN, EPOLLHUP
+from typing import Union
+
from .exceptions import *
from .output import log
-from typing import Optional, Union
+
def gen_uid(entropy_length=256):
return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
+
def multisplit(s, splitters):
- s = [s,]
+ s = [s, ]
for key in splitters:
ns = []
for obj in s:
@@ -19,34 +27,35 @@ def multisplit(s, splitters):
for index, part in enumerate(x):
if len(part):
ns.append(part)
- if index < len(x)-1:
+ if index < len(x) - 1:
ns.append(key)
s = ns
return s
+
def locate_binary(name):
for PATH in os.environ['PATH'].split(':'):
for root, folders, files in os.walk(PATH):
for file in files:
if file == name:
return os.path.join(root, file)
- break # Don't recurse
+ break # Don't recurse
+
class JSON_Encoder:
def _encode(obj):
if isinstance(obj, dict):
- ## We'll need to iterate not just the value that default() usually gets passed
- ## But also iterate manually over each key: value pair in order to trap the keys.
-
+ # We'll need to iterate not just the value that default() usually gets passed
+ # But also iterate manually over each key: value pair in order to trap the keys.
+
copy = {}
for key, val in list(obj.items()):
if isinstance(val, dict):
- val = json.loads(json.dumps(val, cls=JSON)) # This, is a EXTREMELY ugly hack..
- # But it's the only quick way I can think of to
- # trigger a encoding of sub-dictionaries.
+ # This, is a EXTREMELY ugly hack.. but it's the only quick way I can think of to trigger a encoding of sub-dictionaries.
+ val = json.loads(json.dumps(val, cls=JSON))
else:
val = JSON_Encoder._encode(val)
-
+
if type(key) == str and key[0] == '!':
copy[JSON_Encoder._encode(key)] = '******'
else:
@@ -66,6 +75,7 @@ class JSON_Encoder:
else:
return obj
+
class JSON(json.JSONEncoder, json.JSONDecoder):
def _encode(self, obj):
return JSON_Encoder._encode(obj)
@@ -73,7 +83,7 @@ class JSON(json.JSONEncoder, json.JSONDecoder):
def encode(self, obj):
return super(JSON, self).encode(self._encode(obj))
-class sys_command():#Thread):
+class sys_command:
"""
Stolen from archinstall_gui
"""
@@ -117,7 +127,7 @@ class sys_command():#Thread):
user_catalogue = os.path.expanduser('~')
- if (workdir := kwargs.get('workdir', None)):
+ if workdir := kwargs.get('workdir', None):
self.cwd = workdir
self.exec_dir = workdir
else:
@@ -128,8 +138,8 @@ class sys_command():#Thread):
# "which" doesn't work as it's a builtin to bash.
# It used to work, but for whatever reason it doesn't anymore. So back to square one..
- #self.log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
- #self.log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
+ # self.log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
+ # self.log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
self.cmd[0] = locate_binary(self.cmd[0])
if not os.path.isdir(self.exec_dir):
@@ -161,7 +171,7 @@ class sys_command():#Thread):
'exit_code': self.exit_code
}
- def peak(self, output : Union[str, bytes]) -> bool:
+ def peak(self, output: Union[str, bytes]) -> bool:
if type(output) == bytes:
try:
output = output.decode('UTF-8')
@@ -198,7 +208,7 @@ class sys_command():#Thread):
old_dir = os.getcwd()
os.chdir(self.exec_dir)
self.pid, child_fd = pty.fork()
- if not self.pid: # Child process
+ if not self.pid: # Child process
# Replace child process with our main process
if not self.kwargs['emulate']:
try:
@@ -244,7 +254,7 @@ class sys_command():#Thread):
original = trigger
trigger = bytes(original, 'UTF-8')
self.kwargs['events'][trigger] = self.kwargs['events'][original]
- del(self.kwargs['events'][original])
+ del (self.kwargs['events'][original])
if type(self.kwargs['events'][trigger]) != bytes:
self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
@@ -257,19 +267,19 @@ class sys_command():#Thread):
last_trigger_pos = trigger_pos
os.write(child_fd, self.kwargs['events'][trigger])
- del(self.kwargs['events'][trigger])
+ del (self.kwargs['events'][trigger])
broke = True
break
if broke:
continue
- ## Adding a exit trigger:
+ # Adding a exit trigger:
if len(self.kwargs['events']) == 0:
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"Waiting for last command {self.cmd[0]} to finish.", level=logging.DEBUG)
- if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
+ if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0 - len(f']$') - 5:].lower():
if 'debug' in self.kwargs and self.kwargs['debug']:
self.log(f"{self.cmd[0]} has finished.", level=logging.DEBUG)
alive = False
@@ -298,9 +308,11 @@ class sys_command():#Thread):
self.exit_code = 0
if self.exit_code != 0 and not self.kwargs['suppress_errors']:
- #self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
- #self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR)
- raise SysCallError(message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}", exit_code=self.exit_code)
+ # self.log(self.trace_log.decode('UTF-8'), level=logging.DEBUG)
+ # self.log(f"'{self.raw_cmd}' did not exit gracefully, exit code {self.exit_code}.", level=logging.ERROR)
+ raise SysCallError(
+ message=f"{self.trace_log.decode('UTF-8')}\n'{self.raw_cmd}' did not exit gracefully (trace log above), exit code: {self.exit_code}",
+ exit_code=self.exit_code)
self.ended = time.time()
with open(f'{self.cwd}/trace.log', 'wb') as fh:
@@ -318,5 +330,6 @@ def prerequisite_check():
return True
+
def reboot():
o = b''.join(sys_command("/usr/bin/reboot"))