Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds+github@gmail.com>2019-11-27 20:39:45 +0000
committerGitHub <noreply@github.com>2019-11-27 20:39:45 +0000
commit434daeae6840c123cc8c8c3f6feb6f0e8ea50b8a (patch)
tree7cc0ec0ceb88b0c9fe86a51ff922f075458b9d3e
parent96c63c9bc0be8a1f5fc003935af19923a4bc1e19 (diff)
parentbc154eacbe983231282d83e7ee5ae40c6128f026 (diff)
Merge pull request #30 from Torxed/cleanup
Cleanup & Features
-rw-r--r--.gitignore2
-rw-r--r--archinstall.py1157
-rw-r--r--test_archinstall.py14
3 files changed, 757 insertions, 416 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..45939095
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+**/**__pycache__
+SAFETY_LOCK
diff --git a/archinstall.py b/archinstall.py
index 10266545..38856973 100644
--- a/archinstall.py
+++ b/archinstall.py
@@ -2,19 +2,88 @@
import traceback
import os, re, struct, sys, json, pty, shlex
import urllib.request, urllib.parse, ssl, signal
+import time
from glob import glob
from select import epoll, EPOLLIN, EPOLLHUP
from socket import socket, inet_ntoa, AF_INET, AF_INET6, AF_PACKET
from collections import OrderedDict as oDict
-from subprocess import Popen, STDOUT, PIPE
-from time import sleep, time
+from subprocess import Popen, STDOUT, PIPE, check_output
from random import choice
from string import ascii_uppercase, ascii_lowercase, digits
+from hashlib import sha512
+from threading import Thread, enumerate as tenum
+
+if not os.path.isdir('/sys/firmware/efi'):
+ print('[E] This script only supports UEFI-booted machines.')
+ exit(1)
+
+if os.path.isfile('./SAFETY_LOCK'):
+ SAFETY_LOCK = True
+else:
+ SAFETY_LOCK = False
-## == Profiles Path can be set via --profiles-path=/path
-## This just sets the default path if the parameter is omitted.
profiles_path = 'https://raw.githubusercontent.com/Torxed/archinstall/master/deployments'
+rootdir_pattern = re.compile('^.*?/devices')
+harddrives = oDict()
+commandlog = []
+worker_history = oDict()
+instructions = oDict()
+args = {}
+positionals = []
+for arg in sys.argv[1:]:
+ if '--' == arg[:2]:
+ if '=' in arg:
+ key, val = [x.strip() for x in arg[2:].split('=')]
+ else:
+ key, val = arg[2:], True
+ args[key] = val
+ else:
+ positionals.append(arg)
+
+import logging
+from systemd.journal import JournalHandler
+
+# Custom adapter to pre-pend the 'origin' key.
+# TODO: Should probably use filters: https://docs.python.org/3/howto/logging-cookbook.html#using-filters-to-impart-contextual-information
+class CustomAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return '[{}] {}'.format(self.extra['origin'], msg), kwargs
+
+logger = logging.getLogger() # __name__
+journald_handler = JournalHandler()
+journald_handler.setFormatter(logging.Formatter('[{levelname}] {message}', style='{'))
+logger.addHandler(journald_handler)
+logger.setLevel(logging.DEBUG)
+
+class LOG_LEVELS:
+ CRITICAL = 1
+ ERROR = 2
+ WARNING = 3
+ INFO = 4
+ DEBUG = 5
+
+LOG_LEVEL = 4
+
+def log(*msg, origin='UNKNOWN', level=5, **kwargs):
+ if level <= LOG_LEVEL:
+ msg = [item.decode('UTF-8', errors='backslashreplace') if type(item) == bytes else item for item in msg]
+ msg = [str(item) if type(item) != str else item for item in msg]
+ log_adapter = CustomAdapter(logger, {'origin': origin})
+ if level <= 1:
+ log_adapter.critical(' '.join(msg))
+ elif level <= 2:
+ log_adapter.error(' '.join(msg))
+ elif level <= 3:
+ log_adapter.warning(' '.join(msg))
+ elif level <= 4:
+ log_adapter.info(' '.join(msg))
+ else:
+ log_adapter.debug(' '.join(msg))
+
+
+## == Profiles Path can be set via --profiles-path=/path
+## This just sets the default path if the parameter is omitted.
try:
import psutil
except:
@@ -27,7 +96,8 @@ except:
class disk():
def __init__(self, size, free, percent):
- self.size = size
+ self.total = size
+ self.used = 0
self.free = free
self.percent = percent
@@ -36,7 +106,7 @@ except:
self.interface = interface
self.bytes_recv = int(bytes_recv)
self.bytes_sent = int(bytes_sent)
- def __repr__(self, *args, **kwargs):
+ def __repr__(self, *positionals, **kwargs):
return f'iostat@{self.interface}[bytes_sent: {self.bytes_sent}, bytes_recv: {self.bytes_recv}]'
class psutil():
@@ -73,6 +143,7 @@ except:
data[interface] = iostat(interface, *line.strip().decode('UTF-8').split(' ',1))
return data
+
## FIXME: dependency checks (fdisk, lsblk etc)
def sig_handler(signal, frame):
print('\nAborting further installation steps!')
@@ -82,23 +153,10 @@ def sig_handler(signal, frame):
exit(0)
signal.signal(signal.SIGINT, sig_handler)
+def gen_uid(entropy_length=256):
+ return sha512(os.urandom(entropy_length)).hexdigest()
-rootdir_pattern = re.compile('^.*?/devices')
-harddrives = oDict()
-
-args = {}
-positionals = []
-for arg in sys.argv[1:]:
- if '--' == arg[:2]:
- if '=' in arg:
- key, val = [x.strip() for x in arg[2:].split('=')]
- else:
- key, val = arg[2:], True
- args[key] = val
- else:
- positionals.append(arg)
-
-def get_default_gateway_linux():
+def get_default_gateway_linux(*positionals, **kwargs):
"""Read the default gateway directly from /proc."""
with open("/proc/net/route") as fh:
for line in fh:
@@ -131,68 +189,149 @@ def pid_exists(pid):
else:
return True
-class sys_command():
- def __init__(self, cmd, opts={}):
+def simple_command(cmd, opts=None, *positionals, **kwargs):
+ if not opts: opts = {}
+ if 'debug' in opts:
+ print('[!] {}'.format(cmd))
+ handle = Popen(cmd, shell='True', stdout=PIPE, stderr=STDOUT, stdin=PIPE)
+ output = b''
+ while handle.poll() is None:
+ data = handle.stdout.read()
+ if len(data):
+ if 'debug' in opts:
+ print(data.decode('UTF-8'), end='')
+ # print(data.decode('UTF-8'), end='')
+ output += data
+ data = handle.stdout.read()
+ if 'debug' in opts:
+ print(data.decode('UTF-8'), end='')
+ output += data
+ handle.stdin.close()
+ handle.stdout.close()
+ return output
+
+class sys_command():#Thread):
+ def __init__(self, cmd, callback=None, start_callback=None, *positionals, **kwargs):
+ if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
+ if not 'emulate' in kwargs: kwargs['emulate'] = SAFETY_LOCK
+ #Thread.__init__(self)
+ if kwargs['emulate']:
+ print(f"Starting command '{cmd}' in emulation mode.")
self.cmd = shlex.split(cmd)
- self.opts = opts
- self.pid = -1
-
- def __enter__(self, *args, **kwargs):
- ## Prep for context management (still block calls)
- return self.exec()
-
- def __leave__(self, *args, **kwargs):
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Leaving subsystem routine.')
- os.waitpid(self.pid, 0)
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] (Bye bye!)')
-
- def exec(self):
+ self.args = args
+ self.kwargs = kwargs
+ self.callback = callback
+ self.pid = None
+ self.exit_code = None
+ self.started = time.time()
+ self.ended = None
+ self.worker_id = kwargs['worker_id']
+ self.trace_log = b''
+ self.status = 'starting'
+
+ user_catalogue = os.path.expanduser('~')
+ self.cwd = f"{user_catalogue}/archinstall/cache/workers/{kwargs['worker_id']}/"
+ self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
+
if not self.cmd[0][0] == '/':
- print('[N] Command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]))
- o = b''.join(sys_command('/usr/bin/which {}'.format(self.cmd[0])).exec())
- self.cmd[0] = o.decode('UTF-8')
- print('[N] This is what I\'m going with: {}'.format(self.cmd[0]))
- # PID = 0 for child, and the PID of the child for the parent
+ log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
+ o = check_output(['/usr/bin/which', self.cmd[0]])
+ log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
+ self.cmd[0] = o.decode('UTF-8').strip()
+
+ if not os.path.isdir(self.exec_dir):
+ os.makedirs(self.exec_dir)
+
+ if self.kwargs['emulate']:
+ commandlog.append(cmd + ' #emulated')
+ elif 'hide_from_log' in self.kwargs and self.kwargs['hide_from_log']:
+ pass
+ else:
+ commandlog.append(cmd)
+ if start_callback: start_callback(self, *positionals, **kwargs)
+ #self.start()
+ self.run()
+
+ def __iter__(self, *positionals, **kwargs):
+ for line in self.trace_log.split(b'\n'):
+ yield line
+
+ def __repr__(self, *positionals, **kwargs):
+ return f"{self.cmd, self.trace_log}"
+
+ def decode(self, fmt='UTF-8'):
+ return self.trace_log.decode(fmt)
+
+ def dump(self):
+ return {
+ 'status' : self.status,
+ 'worker_id' : self.worker_id,
+ 'worker_result' : self.trace_log.decode('UTF-8'),
+ 'started' : self.started,
+ 'ended' : self.ended,
+ 'started_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
+ 'ended_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
+ 'exit_code' : self.exit_code
+ }
+
+ def run(self):
+ #main = None
+ #for t in tenum():
+ # if t.name == 'MainThread':
+ # main = t
+ # break
+
+ #if not main:
+ # print('Main thread not existing')
+ # return
+
+ self.status = 'running'
+ old_dir = os.getcwd()
+ os.chdir(self.exec_dir)
self.pid, child_fd = pty.fork()
-
if not self.pid: # Child process
# Replace child process with our main process
- os.execv(self.cmd[0], self.cmd)
+ if not self.kwargs['emulate']:
+ try:
+ os.execv(self.cmd[0], self.cmd)
+ except FileNotFoundError:
+ self.status = 'done'
+ log(f"{self.cmd[0]} does not exist.", origin='spawn', level=2)
+ self.exit_code = 1
+ return False
+
+ os.chdir(old_dir)
poller = epoll()
poller.register(child_fd, EPOLLIN | EPOLLHUP)
alive = True
- trace_log = b''
last_trigger_pos = 0
- while alive:
+ while alive and not self.kwargs['emulate']:
for fileno, event in poller.poll(0.1):
try:
output = os.read(child_fd, 8192).strip()
- trace_log += output
+ self.trace_log += output
except OSError:
alive = False
break
- if 'debug' in self.opts and self.opts['debug']:
- if len(output):
- print(output)
+ if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
+ log(self.cmd[0],'gave:', output.decode('UTF-8'), origin='spawn', level=4)
lower = output.lower()
broke = False
- if 'triggers' in self.opts:
- for trigger in list(self.opts['triggers']):
- if trigger.lower() in trace_log[last_trigger_pos:].lower():
- trigger_pos_in_log = trace_log[last_trigger_pos:].lower().find(trigger.lower()) + len(trigger)
+ if 'events' in self.kwargs:
+ for trigger in list(self.kwargs['events']):
+ if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
+ trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Writing to subsystem: {}'.format(self.opts['triggers'][trigger].decode('UTF-8')))
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", origin='spawn', level=5)
- last_trigger_pos = trigger_pos_in_log #len(trace_log)
- os.write(child_fd, self.opts['triggers'][trigger])
- del(self.opts['triggers'][trigger])
+ last_trigger_pos = trigger_pos
+ os.write(child_fd, self.kwargs['events'][trigger])
+ del(self.kwargs['events'][trigger])
broke = True
break
@@ -200,95 +339,78 @@ class sys_command():
continue
## Adding a exit trigger:
- if len(self.opts['triggers']) == 0:
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Waiting for last command to finish...')
- if bytes(f'[root@{args["hostname"]} ~]#'.lower(), 'UTF-8') in trace_log[0-len(f'[root@{args["hostname"]} ~]#')-5:].lower():
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Last command finished, exiting subsystem.')
+ if len(self.kwargs['events']) == 0:
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"Waiting for last command {self.cmd[0]} to finish.", origin='spawn', level=4)
+
+ if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"{self.cmd[0]} has finished.", origin='spawn', level=4)
alive = False
break
- yield output
- # Gracefully wait for the last output to be given to us from the above command.
- # Or break on OSError (process has died)
- last = time()
- while time()-last < 5:
- for fileno, event in poller.poll(0.1):
+ self.status = 'done'
+
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"{self.cmd[0]} waiting for exit code.", origin='spawn', level=5)
+
+ if not self.kwargs['emulate']:
+ try:
+ self.exit_code = os.waitpid(self.pid, 0)[1]
+ except ChildProcessError:
try:
- output = os.read(child_fd, 8192).strip()
- trace_log += output
- except OSError:
- last = time() - 60
- break
+ self.exit_code = os.waitpid(child_fd, 0)[1]
+ except ChildProcessError:
+ self.exit_code = 1
+ else:
+ self.exit_code = 0
- if 'debug' in self.opts and self.opts['debug']:
- if len(output):
- print(output)
+ if self.exit_code != 0:
+ log(f"{self.cmd[0]} did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3)
+ log(self.trace_log.decode('UTF-8'), origin='spawn', level=3)
+ else:
+ log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5)
- last = time()
+ self.ended = time.time()
+ with open(f'{self.cwd}/trace.log', 'wb') as fh:
+ fh.write(self.trace_log)
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Exited subsystem, instructing it to shutdown.')
- # Since we're in a subsystem, we gotta bail out!
- # Bail bail bail!
- os.write(child_fd, b'shutdown now\n')
+ worker_history[self.worker_id] = self.dump()
- # We need to flush the output of shutdown now, otherwise the
- # Popen() handle will hang and we'll never exit out of os.waitpid() later on.
- alive = True
- while alive:
- for fileno, event in poller.poll(0.1):
- try:
- output = os.read(child_fd, 8192).strip()
- trace_log += output
- except OSError:
- alive = False
- break
+ if 'dependency' in self.kwargs:
+ pass # TODO: Not yet supported (steal it from archinstall_gui)
+ """
+ dependency = self.kwargs['dependency']
+ if type(dependency) == str:
+ # Dependency is a progress-string. Wait for it to show up.
+ while main and main.isAlive() and dependency not in progress or progress[dependency] is None:
+ time.sleep(0.25)
+ dependency = progress[dependency]
- if 'debug' in self.opts and self.opts['debug']:
- if len(output):
- print(output)
+ if type(dependency) == str:
+ log(f"{self.func} waited for progress {dependency} which never showed up. Aborting.", level=2, origin='worker', function='run')
+ self.ended = time.time()
+ self.status = 'aborted'
+ return None
- if b'Container temporary has been shutdown.' in trace_log[0-len('Container temporary has been shutdown.')-5:]:
- alive = False
- break
+ while main and main.isAlive() and dependency.ended is None:
+ time.sleep(0.25)
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Waiting for exit code.')
- exit_code = os.waitpid(self.pid, 0)[1]
+ print(' *** Dependency released for:', self.func)
- if exit_code != 0:
- print(trace_log.decode('UTF-8'))
- print('[E] Command "{}" on line ~150 exited with status code:'.format(self.cmd[0]), exit_code)
- print('[?] Command executed: {}'.format(self.cmd))
- exit(1)
+ if dependency.data is None or not main or not main.isAlive():
+ log('Dependency:', dependency.func, 'did not exit clearly. There for,', self.func, 'can not execute.', level=2, origin='worker', function='run')
+ self.ended = time.time()
+ self.status = 'aborted'
+ return None
+ """
- if 'debug' in self.opts and self.opts['debug']:
- print('[N] Subsystem routine complete.')
+ if self.callback:
+ pass # TODO: Not yet supported (steal it from archinstall_gui)
-def simple_command(cmd, opts=None, *args, **kwargs):
- if not opts: opts = {}
- if 'debug' in opts:
- print('[!] {}'.format(cmd))
- handle = Popen(cmd, shell='True', stdout=PIPE, stderr=STDOUT, stdin=PIPE, **kwargs)
- output = b''
- while handle.poll() is None:
- data = handle.stdout.read()
- if len(data):
- if 'debug' in opts:
- print(data.decode('UTF-8'), end='')
- # print(data.decode('UTF-8'), end='')
- output += data
- data = handle.stdout.read()
- if 'debug' in opts:
- print(data.decode('UTF-8'), end='')
- output += data
- handle.stdin.close()
- handle.stdout.close()
- return output
+ #self.callback(self, *self.args, **self.kwargs)
-def get_drive_from_uuid(uuid):
+def get_drive_from_uuid(drive):
if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
for drive in harddrives:
@@ -300,12 +422,12 @@ def get_drive_from_uuid(uuid):
return None
-def get_drive_from_part_uuid(partuuid):
+def get_drive_from_part_uuid(partuuid, *positionals, **kwargs):
if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
for drive in harddrives:
- for partition in grab_partitions(f'/dev/{drive}'):
- o = simple_command(f'blkid -s PARTUUID -o value /dev/{drive}')
+ for partition in get_partitions(f'/dev/{drive}', *positionals, **kwargs):
+ o = simple_command(f'blkid -s PARTUUID -o value /dev/{drive}{partition}')
if len(o) and o == partuuid:
return drive
@@ -352,7 +474,7 @@ def update_git(branch='master'):
os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted',])
extit(1)
-def device_state(name):
+def device_state(name, *positionals, **kwargs):
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
@@ -369,11 +491,11 @@ def device_state(name):
return
return True
-def grab_partitions(dev):
+def get_partitions(dev, *positionals, **kwargs):
drive_name = os.path.basename(dev)
parts = oDict()
- #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)).exec())
- o = b''.join(sys_command('/usr/bin/lsblk -J {dev}'.format(dev=dev)).exec())
+ #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
+ o = b''.join(sys_command(f'/usr/bin/lsblk -J {dev}', hide_from_log=True))
if b'not a block device' in o:
## TODO: Replace o = sys_command() with code, o = sys_command()
## and make sys_command() return the exit-code, way safer than checking output strings :P
@@ -395,11 +517,133 @@ def grab_partitions(dev):
return parts
-def update_drive_list():
+def get_disk_model(drive):
+ with open(f'/sys/block/{os.path.basename(drive)}/device/model', 'rb') as fh:
+ return fh.read().decode('UTF-8').strip()
+
+def get_disk_size(drive):
+ dev_short_name = os.path.basename(drive)
+ with open(f'/sys/block/{dev_short_name}/device/block/{dev_short_name}/size', 'rb') as fh:
+ return ''.join(human_readable_size(fh.read().decode('UTF-8').strip()))
+
+def disk_info(drive, *positionals, **kwargs):
+ lkwargs = {**kwargs}
+ lkwargs['emulate'] = False # This is a emulate-safe function. Does not alter filesystem.
+
+ info = json.loads(b''.join(sys_command(f'lsblk -J -o "NAME,SIZE,FSTYPE,LABEL" {drive}', *positionals, **lkwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices'][0]
+ fileformats = []
+ labels = []
+ if 'children' in info: ## Might not be partitioned yet
+ for child in info['children']:
+ if child['fstype'] != None:
+ fileformats.append(child['fstype'])
+ if child['label'] != None:
+ labels.append(child['label'])
+ else:
+ fileformats = ['*Empty Drive*']
+ labels = ['(no partitions)']
+ info['fileformats'] = fileformats
+ info['labels'] = labels
+ info['model'] = get_disk_model(drive)
+
+ return info
+
+def cleanup_args():
+ for key in args:
+ if args[key] == '<STDIN>': args[key] = input(f'Enter a value for {key}: ')
+ elif args[key] == '<RND_STR>': args[key] = random_string(32)
+ elif args[key] == '<YUBIKEY>':
+ args[key] = gen_yubikey_password()
+ if not args[key]:
+ print('[E] Failed to setup a yubikey password, is it plugged in?')
+ exit(1)
+
+def merge_in_includes(instructions, *positionals, **kwargs):
+ if 'args' in instructions:
+ ## == Recursively fetch instructions if "include" is found under {args: ...}
+ while 'include' in instructions['args']:
+ includes = instructions['args']['include']
+ print('[!] Importing net-deploy target: {}'.format(includes))
+ del(instructions['args']['include'])
+ if type(includes) in (dict, list):
+ for include in includes:
+ instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
+ else:
+ instructions = merge_dicts(instructions, get_instructions(includes), *positionals, **kwargs, before=True)
+
+ ## Update arguments if we found any
+ for key, val in instructions['args'].items():
+ args[key] = val
+
+ if 'args' in instructions:
+ ## TODO: Reuseable code, there's to many get_instructions, merge_dictgs and args updating going on.
+ ## Update arguments if we found any
+ for key, val in instructions['args'].items():
+ args[key] = val
+
+ return instructions
+
+
+def update_drive_list(*positionals, **kwargs):
+ # https://github.com/karelzak/util-linux/blob/f920f73d83f8fd52e7a14ec0385f61fab448b491/disk-utils/fdisk-list.c#L52
for path in glob('/sys/block/*/device'):
name = re.sub('.*/(.*?)/device', '\g<1>', path)
- if device_state(name):
- harddrives['/dev/{}'.format(name)] = psutil.disk_usage('/dev/{}'.format(name))
+ if device_state(name, *positionals, **kwargs):
+ harddrives[f'/dev/{name}'] = disk_info(f'/dev/{name}', *positionals, **kwargs)
+
+def human_readable_size(bits, sizes=[{8 : 'b'}, {1024 : 'kb'}, {1024 : 'mb'}, {1024 : 'gb'}, {1024 : 'tb'}, {1024 : 'zb?'}]):
+ # Not needed if using lsblk.
+ end_human = None
+ for pair in sizes:
+ size, human = list(pair.items())[0]
+
+ if bits / size > 1:
+ bits = bits/size
+ end_human = human
+ else:
+ break
+ return bits, end_human
+
+def human_disk_info(drive):
+ return {
+ 'size' : harddrives[drive]['size'],
+ 'fileformat' : harddrives[drive]['fileformats'],
+ 'labels' : harddrives[drive]['labels']
+ }
+
+def close_disks():
+ o = simple_command('/usr/bin/umount -R /mnt/boot')
+ o = simple_command('/usr/bin/umount -R /mnt')
+ o = simple_command('/usr/bin/cryptsetup close /dev/mapper/luksdev')
+
+def format_disk(drive='drive', start='start', end='size', emulate=False, *positionals, **kwargs):
+ drive = args[drive]
+ start = args[start]
+ end = args[end]
+ if not drive:
+ raise ValueError('Need to supply a drive path, for instance: /dev/sdx')
+ # dd if=/dev/random of=args['drive'] bs=4096 status=progress
+ # https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda
+ if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} name 1 "EFI"', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} set 1 esp on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} set 1 boot on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} mkpart primary {start} {end}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+
+ # TODO: grab partitions after each parted/partition step instead of guessing which partiton is which later on.
+ # Create one, grab partitions - dub that to "boot" or something. do the next partition, grab that and dub it "system".. or something..
+ # This "assumption" has bit me in the ass so many times now I've stoped counting.. Jerker is right.. Don't do it like this :P
+
+ return True
def multisplit(s, splitters):
s = [s,]
@@ -429,8 +673,7 @@ def get_application_instructions(target):
instructions = grab_url_data('{}/applications/{}.json'.format(args['profiles-path'], target)).decode('UTF-8')
print('[N] Found application instructions for: {}'.format(target))
except urllib.error.HTTPError:
- print('[N] No instructions found for: {}'.format(target))
- print('[N] Trying local instructions under ./deployments/applications')
+ print('[N] Could not find remote instructions. yrying local instructions under ./deployments/applications')
local_path = './deployments/applications' if os.path.isfile('./archinstall.py') else './archinstall/deployments/applications' # Dangerous assumption
if os.path.isfile(f'{local_path}/{target}.json'):
with open(f'{local_path}/{target}.json', 'r') as fh:
@@ -438,6 +681,7 @@ def get_application_instructions(target):
print('[N] Found local application instructions for: {}'.format(target))
else:
+ print('[N] No instructions found for: {}'.format(target))
return instructions
try:
@@ -449,14 +693,13 @@ def get_application_instructions(target):
return instructions
-def get_instructions(target):
- instructions = {}
+def get_instructions(target, *positionals, **kwargs):
+ instructions = oDict()
try:
instructions = grab_url_data('{}/{}.json'.format(args['profiles-path'], target)).decode('UTF-8')
print('[N] Found net-deploy instructions called: {}'.format(target))
except urllib.error.HTTPError:
- print('[N] No instructions found called: {}'.format(target))
- print('[N] Trying local instructions under ./deployments')
+ print('[N] Could not find remote instructions. Trying local instructions under ./deployments')
local_path = './deployments' if os.path.isfile('./archinstall.py') else './archinstall/deployments' # Dangerous assumption
if os.path.isfile(f'{local_path}/{target}.json'):
with open(f'{local_path}/{target}.json', 'r') as fh:
@@ -464,6 +707,7 @@ def get_instructions(target):
print('[N] Found local instructions called: {}'.format(target))
else:
+ print('[N] No instructions found called: {}'.format(target))
return instructions
try:
@@ -495,14 +739,16 @@ def merge_dicts(d1, d2, before=True, overwrite=False):
def random_string(l):
return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) for i in range(l))
-if __name__ == '__main__':
- update_git() # Breaks and restarts the script if an update was found.
- update_drive_list()
- if not os.path.isdir('/sys/firmware/efi'):
- print('[E] This script only supports UEFI-booted machines.')
- exit(1)
+def phone_home(url):
+ payload = json.dumps({"hostname": args['hostname'],
+ "done" : time.time(),
+ "profile": args['profile'],
+ "drive": args['drive'],
+ "base_status": base_return_code}).encode('utf8')
+ request = urllib.request.Request(url, data=payload, headers={'content-type': 'application/json'})
+ response = urllib.request.urlopen(request)
- ## Setup some defaults (in case no command-line parameters or netdeploy-params were given)
+def setup_args_defaults(args, interactive=True):
if not 'size' in args: args['size'] = '100%'
if not 'start' in args: args['start'] = '513MiB'
if not 'pwfile' in args: args['pwfile'] = '/tmp/diskpw'
@@ -515,22 +761,27 @@ if __name__ == '__main__':
if not 'profile' in args: args['profile'] = None
if not 'profiles-path' in args: args['profiles-path'] = profiles_path
if not 'rerun' in args: args['rerun'] = None
+ if not 'aur-keep' in args: args['aur-keep'] = False
if not 'aur-support' in args: args['aur-support'] = True # Support adds yay (https://github.com/Jguer/yay) in installation steps.
if not 'ignore-rerun' in args: args['ignore-rerun'] = False
if not 'localtime' in args: args['localtime'] = 'Europe/Stockholm' if args['country'] == 'SE' else 'GMT+0' # TODO: Arbitrary for now
+ if not 'phone-home' in args: args['phone-home'] = False
if not 'drive' in args:
- drives = sorted(list(harddrives.keys()))
- if len(drives) > 1 and 'force' not in args and ('default' in args and 'first-drive' not in args):
- for index, drive in enumerate(drives):
- print(f'{index}: {drive} ({harddrives[drive]})')
- drive = input('Select one of the above disks (by number): ')
- if not drive.isdigit():
- raise KeyError("Multiple disks found, --drive=/dev/X not specified (or --force/--first-drive)")
- drives = [drives[int(drive)]] # Make sure only the selected drive is in the list of options
- args['drive'] = drives[0] # First drive found
+ if interactive and len(harddrives):
+ drives = sorted(list(harddrives.keys()))
+ if len(drives) > 1 and 'force' not in args and ('default' in args and 'first-drive' not in args):
+ for index, drive in enumerate(drives):
+ print(f"{index}: {drive} ({harddrives[drive]['size'], harddrives[drive]['fstype'], harddrives[drive]['label']})")
+ drive = input('Select one of the above disks (by number): ')
+ if not drive.isdigit():
+ raise KeyError("Multiple disks found, --drive=/dev/X not specified (or --force/--first-drive)")
+ drives = [drives[int(drive)]] # Make sure only the selected drive is in the list of options
+ args['drive'] = drives[0] # First drive found
+ else:
+ args['drive'] = None
rerun = args['ignore-rerun']
- if args['drive'][0] != '/':
+ if args['drive'] and args['drive'][0] != '/':
## Remap the selected UUID to the device to be formatted.
drive = get_drive_from_uuid(args['drive'])
if not drive:
@@ -542,17 +793,17 @@ if __name__ == '__main__':
exit(1)
args['drive'] = drive
+ return args
- ## == If we got networking,
- # Try fetching instructions for this box and execute them.
- instructions = {}
- if get_default_gateway_linux():
+def load_automatic_instructions(*positionals, **kwargs):
+ instructions = oDict()
+ if get_default_gateway_linux(*positionals, **kwargs):
locmac = get_local_MACs()
if not len(locmac):
print('[N] No network interfaces - No net deploy.')
else:
for mac in locmac:
- instructions = get_instructions(mac)
+ instructions = get_instructions(mac, *positionals, **kwargs)
if 'args' in instructions:
## == Recursively fetch instructions if "include" is found under {args: ...}
@@ -562,9 +813,9 @@ if __name__ == '__main__':
del(instructions['args']['include'])
if type(includes) in (dict, list):
for include in includes:
- instructions = merge_dicts(instructions, get_instructions(include), before=True)
+ instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
else:
- instructions = merge_dicts(instructions, get_instructions(includes), before=True)
+ instructions = merge_dicts(instructions, get_instructions(includes, *positionals, **kwargs), before=True)
## Update arguments if we found any
for key, val in instructions['args'].items():
@@ -572,267 +823,173 @@ if __name__ == '__main__':
else:
print('[N] No gateway - No net deploy')
- if args['profile'] and not args['default']:
- instructions = get_instructions(args['profile'])
- if len(instructions) <= 0:
- print('[E] No instructions by the name of {} was found.'.format(args['profile']))
- print(' Installation won\'t continue until a valid profile is given.')
- print(' (this is because --profile was given and a --default is not given)')
- exit(1)
- else:
- first = True
- while not args['default'] and not args['profile'] and len(instructions) <= 0:
- profile = input('What template do you want to install: ')
- instructions = get_instructions(profile)
- if first and len(instructions) <= 0:
- print('[E] No instructions by the name of {} was found.'.format(profile))
- print(' Installation won\'t continue until a valid profile is given.')
- print(' (this is because --default is not instructed and no --profile given)')
- first = False
-
-
- if 'args' in instructions:
- ## == Recursively fetch instructions if "include" is found under {args: ...}
- while 'include' in instructions['args']:
- includes = instructions['args']['include']
- print('[!] Importing net-deploy target: {}'.format(includes))
- del(instructions['args']['include'])
- if type(includes) in (dict, list):
- for include in includes:
- instructions = merge_dicts(instructions, get_instructions(include), before=True)
- else:
- instructions = merge_dicts(instructions, get_instructions(includes), before=True)
-
- ## Update arguments if we found any
- for key, val in instructions['args'].items():
- args[key] = val
-
- if 'args' in instructions:
- ## TODO: Reuseable code, there's to many get_instructions, merge_dictgs and args updating going on.
- ## Update arguments if we found any
- for key, val in instructions['args'].items():
- args[key] = val
-
- for key in args:
- if args[key] == '<STDIN>': args[key] = input(f'Enter a value for {key}: ')
- elif args[key] == '<RND_STR>': args[key] = random_string(32)
- elif args[key] == '<YUBIKEY>':
- args[key] = gen_yubikey_password()
- if not args[key]:
- print('[E] Failed to setup a yubikey password, is it plugged in?')
- exit(1)
-
-# if args['password'] == '<STDIN>': args['password'] = input('Enter a disk (and root) password: ')
-# elif args['password'] == '<YUBIKEY>':
-# args['password'] = gen_yubikey_password()
-# if not args['password']:
-
- print(json.dumps(args, indent=4))
- if args['default'] and not 'force' in args:
- if(input('Are these settings OK? (No return beyond this point) N/y: ').lower() != 'y'):
- exit(1)
+ return instructions
+def cache_diskpw_on_disk():
if not os.path.isfile(args['pwfile']):
#PIN = '0000'
with open(args['pwfile'], 'w') as pw:
pw.write(args['password'])
- #else:
- # ## TODO: Convert to `rb` instead.
- # # We shouldn't discriminate \xfu from being a passwd phrase.
- # with open(args['pwfile'], 'r') as pw:
- # PIN = pw.read().strip()
-
+def refresh_partition_list(drive, *positionals, **kwargs):
+ drive = args[drive]
+ if not 'partitions' in args:
+ args['partitions'] = oDict()
+ for index, part_name in enumerate(sorted(get_partitions(drive, *positionals, **kwargs).keys())):
+ args['partitions'][str(index+1)] = part_name
+ return True
- print()
- print('[!] Disk PASSWORD is: {}'.format(args['password']))
- print()
+def mkfs_fat32(drive, partition, *positionals, **kwargs):
+ drive = args[drive]
+ partition = args['partitions'][partition]
+ o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {drive}{partition}'))
+ if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
+ return None
+ return True
- if not args['rerun'] or args['ignore-rerun']:
- for i in range(5, 0, -1):
- print(f'Formatting {args["drive"]} in {i}...')
- sleep(1)
+def is_luksdev_mounted(*positionals, **kwargs):
+ o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev', hide_from_log=True)) # /dev/dm-0
+ if b'cannot open' in o:
+ return False
+ return True
- o = simple_command('/usr/bin/umount -R /mnt')
- o = simple_command('/usr/bin/cryptsetup close /dev/mapper/luksdev')
- print('[N] Setting up {drive}.'.format(**args))
- # dd if=/dev/random of=args['drive'] bs=4096 status=progress
- # https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda
- o = b''.join(sys_command('/usr/bin/parted -s {drive} mklabel gpt'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/parted -s {drive} name 1 "EFI"'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/parted -s {drive} set 1 esp on'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/parted -s {drive} set 1 boot on'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/parted -s {drive} mkpart primary {start} {size}'.format(**args)).exec())
- # TODO: grab paritions after each parted/partition step instead of guessing which partiton is which later on.
- # Create one, grab partitions - dub that to "boot" or something. do the next partition, grab that and dub it "system".. or something..
- # This "assumption" has bit me in the ass so many times now I've stoped counting.. Jerker is right.. Don't do it like this :P
-
- args['paritions'] = grab_partitions(args['drive'])
- print(f'Partitions: (Boot: {list(args["paritions"].keys())[0]})')
+def mount_luktsdev(drive, partition, keyfile, *positionals, **kwargs):
+ drive = args[drive]
+ partition = args['partitions'][partition]
+ keyfile = args[keyfile]
+ if not is_luksdev_mounted():
+ o = b''.join(sys_command(f'/usr/bin/cryptsetup open {drive}{partition} luksdev --key-file {keyfile} --type luks2'.format(**args)))
+ return is_luksdev_mounted()
+
+def encrypt_partition(drive, partition, keyfile='/tmp/diskpw', *positionals, **kwargs):
+ drive = args[drive]
+ partition = args['partitions'][partition]
+ keyfile = args[keyfile]
+ o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash sha512 --key-size 512 --iter-time 10000 --key-file {keyfile} --use-urandom luksFormat {drive}{partition}'))
+ if not b'Command successful.' in o:
+ return False
+ return True
- if len(args['paritions']) <= 0:
- print('[E] No paritions were created on {drive}'.format(**args), o)
- exit(1)
- for index, part_name in enumerate(sorted(args['paritions'].keys())):
- args['partition_{}'.format(index+1)] = part_name
- print(f'Partition info: {part_name}')
- print(json.dumps(args['paritions'][part_name], indent=4))
+def mkfs_btrfs(drive='/dev/mapper/luksdev', *positionals, **kwargs):
+ o = b''.join(sys_command('/usr/bin/mkfs.btrfs -f /dev/mapper/luksdev'))
+ if not b'UUID' in o:
+ return False
+ return True
- if not args['rerun'] or args['ignore-rerun']:
- o = b''.join(sys_command('/usr/bin/mkfs.vfat -F32 {drive}{partition_1}'.format(**args)).exec())
- if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
- print('[E] Could not setup {drive}{partition_1}'.format(**args), o)
- exit(1)
+def mount_luksdev(where='/dev/mapper/luksdev', to='/mnt', *positionals, **kwargs):
+ check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt', *positionals, **kwargs).decode('UTF-8').strip()# /dev/dm-0
+ if len(check_mounted):
+ return False
- # "--cipher sha512" breaks the shit.
- # TODO: --use-random instead of --use-urandom
- print('[N] Adding encryption to {drive}{partition_2}.'.format(**args))
- o = b''.join(sys_command('/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash sha512 --key-size 512 --iter-time 10000 --key-file {pwfile} --use-urandom luksFormat {drive}{partition_2}'.format(**args)).exec())
- if not b'Command successful.' in o:
- print('[E] Failed to setup disk encryption.', o)
- exit(1)
+ o = b''.join(sys_command('/usr/bin/mount /dev/mapper/luksdev /mnt', *positionals, **kwargs))
+ return True
- o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev').exec()) # /dev/dm-0
- if b'cannot open' in o:
- o = b''.join(sys_command('/usr/bin/cryptsetup open {drive}{partition_2} luksdev --key-file {pwfile} --type luks2'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev').exec()) # /dev/dm-0
- if b'cannot open' in o:
- print('[E] Could not open encrypted device.', o)
- exit(1)
+def mount_boot(drive, partition, mountpoint='/mnt/boot', *positionals, **kwargs):
+ os.makedirs('/mnt/boot', exist_ok=True)
+ #o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0
- if not args['rerun'] or args['ignore-rerun']:
- print('[N] Creating btrfs filesystem inside {drive}{partition_2}'.format(**args))
- o = b''.join(sys_command('/usr/bin/mkfs.btrfs -f /dev/mapper/luksdev').exec())
- if not b'UUID' in o:
- print('[E] Could not setup btrfs filesystem.', o)
- exit(1)
+ check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs).decode('UTF-8').strip()
+ if len(check_mounted):
+ return False
- o = simple_command('/usr/bin/mount | /usr/bin/grep /mnt') # /dev/dm-0
- if len(o) <= 0:
- o = b''.join(sys_command('/usr/bin/mount /dev/mapper/luksdev /mnt').exec())
+ o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs))
+ return True
- os.makedirs('/mnt/boot', exist_ok=True)
- o = simple_command('/usr/bin/mount | /usr/bin/grep /mnt/boot') # /dev/dm-0
- if len(o) <= 0:
- o = b''.join(sys_command('/usr/bin/mount {drive}{partition_1} /mnt/boot'.format(**args)).exec())
+def mount_mountpoints(drive, bootpartition, mountpoint='/mnt/boot', *positionals, **kwargs):
+ drive = args[drive]
+ bootpartition = args['partitions'][bootpartition]
+ mount_luksdev(*positionals, **kwargs)
+ mount_boot(drive, bootpartition, mountpoint='/mnt/boot', *positionals, **kwargs)
+ return True
- if 'mirrors' in args and args['mirrors'] and 'country' in args and get_default_gateway_linux():
- print('[N] Reordering mirrors.')
- o = simple_command("/usr/bin/wget 'https://www.archlinux.org/mirrorlist/?country={country}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O /root/mirrorlist".format(**args))
- o = simple_command("/usr/bin/sed -i 's/#Server/Server/' /root/mirrorlist")
- o = simple_command('/usr/bin/rankmirrors -n 6 /root/mirrorlist > /etc/pacman.d/mirrorlist')
+def re_rank_mirrors(top=10, *positionals, **kwargs):
+ if sys_command(('/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist')).exit_code == 0:
+ return True
+ return False
+
+def filter_mirrors_by_country_list(countries, top=None, *positionals, **kwargs):
+ ## TODO: replace wget with urllib.request (no point in calling syscommand)
+ country_list = []
+ for country in countries.split(','):
+ country_list.append(f'country={country}')
+ o = b''.join(sys_command((f"/usr/bin/wget 'https://www.archlinux.org/mirrorlist/?{'&'.join(country_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O /root/mirrorlist")))
+ o = b''.join(sys_command(("/usr/bin/sed -i 's/#Server/Server/' /root/mirrorlist")))
+ o = b''.join(sys_command(("/usr/bin/mv /root/mirrorlist /etc/pacman.d/")))
+
+ if top:
+ re_rank_mirrors(top, *positionals, **kwargs) or not os.path.isfile('/etc/pacman.d/mirrorlist')
- pre_conf = {}
- if 'pre' in instructions:
- pre_conf = instructions['pre']
- elif 'prerequisits' in instructions:
- pre_conf = instructions['prerequisits']
+ return True
- if 'git-branch' in pre_conf:
- update_git(pre_conf['git-branch'])
- del(pre_conf['git-branch'])
+def strap_in_base(*positionals, **kwargs):
+ if args['aur-support']:
+ args['packages'] += ' git'
+ if sys_command('/usr/bin/pacman -Syy', *positionals, **kwargs).exit_code == 0:
+ x = sys_command('/usr/bin/pacstrap /mnt base base-devel linux linux-firmware btrfs-progs efibootmgr nano wpa_supplicant dialog {packages}'.format(**args), *positionals, **kwargs)
+ if x.exit_code == 0:
+ return True
+ return False
+
+def configure_base_system(*positionals, **kwargs):
+ ## TODO: Replace a lot of these syscalls with just python native operations.
+ o = b''.join(sys_command('/usr/bin/genfstab -pU /mnt >> /mnt/etc/fstab'))
+ with open('/mnt/etc/fstab', 'a') as fstab:
+ fstab.write('\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n') # Redundant \n at the start? who knoes?
+
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt rm -f /etc/localtime'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt ln -s /usr/share/zoneinfo/{localtime} /etc/localtime'.format(**args)))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime'))
+ #o = sys_command('arch-chroot /mnt echo "{hostname}" > /etc/hostname'.format(**args))
+ #o = sys_command("arch-chroot /mnt sed -i 's/#\(en_US\.UTF-8\)/\1/' /etc/locale.gen")
+ o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo '{hostname}' > /etc/hostname\"".format(**args)))
+ o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'en_US.UTF-8 UTF-8' > /etc/locale.gen\""))
+ o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'LANG=en_US.UTF-8' > /etc/locale.conf\""))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt locale-gen'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt chmod 700 /root'))
+
+ with open('/mnt/etc/mkinitcpio.conf', 'w') as mkinit:
+ ## TODO: Don't replace it, in case some update in the future actually adds something.
+ mkinit.write('MODULES=(btrfs)\n')
+ mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
+ mkinit.write('FILES=()\n')
+ mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keyboard fsck)\n')
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt mkinitcpio -p linux'))
- ## Prerequisit steps needs to NOT be executed in arch-chroot.
- ## Mainly because there's no root structure to chroot into.
- ## But partly because some configurations need to be done against the live CD.
- ## (For instance, modifying mirrors are done on LiveCD and replicated intwards)
- for title in pre_conf:
- print('[N] Network prerequisit step: {}'.format(title))
- if args['rerun'] and args['rerun'] != title and not rerun:
- continue
- else:
- rerun = True
+ return True
- for command in pre_conf[title]:
- raw_command = command
- opts = pre_conf[title][raw_command] if type(pre_conf[title][raw_command]) in (dict, oDict) else {}
- if len(opts):
- if 'pass-args' in opts or 'format' in opts:
- command = command.format(**args)
- ## FIXME: Instead of deleting the two options
- ## in order to mute command output further down,
- ## check for a 'debug' flag per command and delete these two
- if 'pass-args' in opts:
- del(opts['pass-args'])
- elif 'format' in opts:
- del(opts['format'])
- elif 'debug' in opts and opts['debug']:
- print('[N] Complete command-string: '.format(command))
- else:
- print('[-] Options: {}'.format(opts))
+def setup_bootloader(*positionals, **kwargs):
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt bootctl --no-variables --path=/boot install'))
- #print('[N] Command: {} ({})'.format(raw_command, opts))
- o = b''.join(sys_command('{c}'.format(c=command), opts).exec())
- if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in b''.join(o):
- print('[W] Prerequisit step failed: {}'.format(b''.join(o).decode('UTF-8')))
- #print(o)
+ with open('/mnt/boot/loader/loader.conf', 'w') as loader:
+ loader.write('default arch\n')
+ loader.write('timeout 5\n')
- if not args['rerun'] or rerun:
- print('[N] Straping in packages.')
- if args['aur-support']:
- args['packages'] += ' git'
- o = b''.join(sys_command('/usr/bin/pacman -Syy').exec())
- o = b''.join(sys_command('/usr/bin/pacstrap /mnt base base-devel linux linux-firmware btrfs-progs efibootmgr nano wpa_supplicant dialog {packages}'.format(**args)).exec())
+ ## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
+ ## And blkid is wrong in terms of LUKS.
+ #UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
+ UUID = simple_command(f"ls -l /dev/disk/by-uuid/ | grep {os.path.basename(args['drive'])}{args['partitions']['2']} | awk '{{print $9}}'").decode('UTF-8').strip()
+ with open('/mnt/boot/loader/entries/arch.conf', 'w') as entry:
+ entry.write('title Arch Linux\n')
+ entry.write('linux /vmlinuz-linux\n')
+ entry.write('initrd /initramfs-linux.img\n')
+ entry.write('options cryptdevice=UUID={UUID}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n'.format(UUID=UUID))
- if not os.path.isdir('/mnt/etc'): # TODO: This might not be the most long term stable thing to rely on...
- print('[E] Failed to strap in packages', o)
- exit(1)
+ return True
- if not args['rerun'] or rerun:
- o = b''.join(sys_command('/usr/bin/genfstab -pU /mnt >> /mnt/etc/fstab').exec())
- with open('/mnt/etc/fstab', 'a') as fstab:
- fstab.write('\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n') # Redundant \n at the start? who knoes?
-
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt rm -f /etc/localtime').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt ln -s /usr/share/zoneinfo/{localtime} /etc/localtime'.format(**args)).exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime').exec())
- #o = sys_command('arch-chroot /mnt echo "{hostname}" > /etc/hostname'.format(**args)).exec()
- #o = sys_command("arch-chroot /mnt sed -i 's/#\(en_US\.UTF-8\)/\1/' /etc/locale.gen").exec()
- o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo '{hostname}' > /etc/hostname\"".format(**args)).exec())
- o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'en_US.UTF-8 UTF-8' > /etc/locale.gen\"").exec())
- o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'LANG=en_US.UTF-8' > /etc/locale.conf\"").exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt locale-gen').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt chmod 700 /root').exec())
-
- with open('/mnt/etc/mkinitcpio.conf', 'w') as mkinit:
- ## TODO: Don't replace it, in case some update in the future actually adds something.
- mkinit.write('MODULES=(btrfs)\n')
- mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keyboard fsck)\n')
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt mkinitcpio -p linux').exec())
- ## WORKAROUND: https://github.com/systemd/systemd/issues/13603#issuecomment-552246188
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt bootctl --no-variables --path=/boot install').exec())
-
- with open('/mnt/boot/loader/loader.conf', 'w') as loader:
- loader.write('default arch\n')
- loader.write('timeout 5\n')
-
- ## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
- ## And blkid is wrong in terms of LUKS.
- #UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').exec().strip()
- UUID = simple_command("ls -l /dev/disk/by-uuid/ | grep {basename}{partition_2} | awk '{{print $9}}'".format(basename=os.path.basename(args['drive']), **args)).decode('UTF-8').strip()
- with open('/mnt/boot/loader/entries/arch.conf', 'w') as entry:
- entry.write('title Arch Linux\n')
- entry.write('linux /vmlinuz-linux\n')
- entry.write('initrd /initramfs-linux.img\n')
- entry.write('options cryptdevice=UUID={UUID}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n'.format(UUID=UUID))
+def add_AUR_support(*positionals, **kwargs):
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "useradd -m -G wheel aibuilder"'))
+ o = b''.join(sys_command("/usr/bin/sed -i 's/# %wheel ALL=(ALL) NO/%wheel ALL=(ALL) NO/' /mnt/etc/sudoers"))
+
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder; git clone https://aur.archlinux.org/yay.git)\\""'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "chown -R aibuilder.aibuilder /home/aibuilder/yay"'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder/yay; makepkg -si --noconfirm)\\" >/dev/null"'))
+ ## Do not remove aibuilder just yet, can be used later for aur packages.
+ #o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
+ #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
+ #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
+ return True
- if args['aur-support']:
- print('[N] AUR support demanded, building "yay" before running POST steps.')
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "useradd -m -G wheel aibuilder"').exec())
- o = b''.join(sys_command("/usr/bin/sed -i 's/# %wheel ALL=(ALL) NO/%wheel ALL=(ALL) NO/' /mnt/etc/sudoers").exec())
-
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder; git clone https://aur.archlinux.org/yay.git)\\""').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "chown -R aibuilder.aibuilder /home/aibuilder/yay"').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder/yay; makepkg -si --noconfirm)\\" >/dev/null"').exec())
- ## Do not remove aibuilder just yet, can be used later for aur packages.
- #o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers').exec())
- #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"').exec())
- #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"').exec())
- print('[N] AUR support added. use "yay -Syy --noconfirm <package>" to deploy in POST.')
-
+def run_post_install_steps(*positionals, **kwargs):
conf = {}
if 'post' in instructions:
conf = instructions['post']
@@ -921,20 +1078,188 @@ if __name__ == '__main__':
bytes(f'login:', 'UTF-8') : b'root\n',
#b'Password:' : bytes(args['password']+'\n', 'UTF-8'),
bytes(f'[root@{args["hostname"]} ~]#', 'UTF-8') : bytes(command+'\n', 'UTF-8'),
- }, **opts}).exec())
+ }, **opts}))
## Not needed anymore: And cleanup after out selves.. Don't want to leave any residue..
# os.remove('/mnt/etc/systemd/system/console-getty.service.d/override.conf')
else:
- o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt --machine temporary {c}'.format(c=command), opts=opts).exec())
+ o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt --machine temporary {c}'.format(c=command), opts=opts))
if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in o:
print('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
#print(o)
- if args['aur-support']:
- o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"').exec())
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"').exec())
+if __name__ == '__main__':
+ update_git() # Breaks and restarts the script if an update was found.
+ update_drive_list()
+
+ ## Setup some defaults
+ # (in case no command-line parameters or netdeploy-params were given)
+ args = setup_args_defaults(args)
+
+ ## == If we got networking,
+ # Try fetching instructions for this box and execute them.
+ instructions = load_automatic_instructions()
+
+ if args['profile'] and not args['default']:
+ instructions = get_instructions(args['profile'])
+ if len(instructions) <= 0:
+ print('[E] No instructions by the name of {} was found.'.format(args['profile']))
+ print(' Installation won\'t continue until a valid profile is given.')
+ print(' (this is because --profile was given and a --default is not given)')
+ exit(1)
+ else:
+ first = True
+ while not args['default'] and not args['profile'] and len(instructions) <= 0:
+ profile = input('What template do you want to install: ')
+ instructions = get_instructions(profile)
+ if first and len(instructions) <= 0:
+ print('[E] No instructions by the name of {} was found.'.format(profile))
+ print(' Installation won\'t continue until a valid profile is given.')
+ print(' (this is because --default is not instructed and no --profile given)')
+ first = False
+
+ # TODO: Might not need to return anything here, passed by reference?
+ instructions = merge_in_includes(instructions)
+ cleanup_args()
+
+ print(json.dumps(args, indent=4))
+ if args['default'] and not 'force' in args:
+ if(input('Are these settings OK? (No return beyond this point) N/y: ').lower() != 'y'):
+ exit(1)
+
+ cache_diskpw_on_disk()
+ #else:
+ # ## TODO: Convert to `rb` instead.
+ # # We shouldn't discriminate \xfu from being a passwd phrase.
+ # with open(args['pwfile'], 'r') as pw:
+ # PIN = pw.read().strip()
+
+ print()
+ print('[!] Disk PASSWORD is: {}'.format(args['password']))
+ print()
+
+ if not args['rerun'] or args['ignore-rerun']:
+ for i in range(5, 0, -1):
+ print(f'Formatting {args["drive"]} in {i}...')
+ time.sleep(1)
+
+ close_disks()
+ print(f'[N] Setting up {args["drive"]}.')
+ format_disk('drive', start='start', end='size')
+
+ refresh_partition_list('drive')
+ print(f'Partitions: (Boot: {list(args["partitions"].keys())[0]})')
+
+ if len(args['partitions']) <= 0:
+ print(f'[E] No partitions were created on {args["drive"]}', o)
+ exit(1)
+
+ if not args['rerun'] or args['ignore-rerun']:
+ if not mkfs_fat32('drive', '1'):
+ print(f'[E] Could not setup {args["drive"]}{args["partitions"]["1"]}')
+ exit(1)
+
+ # "--cipher sha512" breaks the shit.
+ # TODO: --use-random instead of --use-urandom
+ print(f'[N] Adding encryption to {args["drive"]}{args["partitions"]["2"]}.')
+ if not encrypt_partition('drive', '2', 'pwfile'):
+ print('[E] Failed to setup disk encryption.', o)
+ exit(1)
+
+ if not mount_luktsdev('drive', '2', 'pwfile'):
+ print('[E] Could not open encrypted device.', o)
+ exit(1)
+
+ if not args['rerun'] or args['ignore-rerun']:
+ print(f'[N] Creating btrfs filesystem inside {args["drive"]}{args["partitions"]["2"]}')
+ if not mkfs_btrfs():
+ print('[E] Could not setup btrfs filesystem.', o)
+ exit(1)
+
+ mount_mountpoints('drive', '1')
+
+ if 'mirrors' in args and args['mirrors'] and 'country' in args and get_default_gateway_linux():
+ print('[N] Reordering mirrors.')
+ filter_mirrors_by_country_list([args['country']])
+
+ pre_conf = {}
+ if 'pre' in instructions:
+ pre_conf = instructions['pre']
+ elif 'prerequisits' in instructions:
+ pre_conf = instructions['prerequisits']
+
+ if 'git-branch' in pre_conf:
+ update_git(pre_conf['git-branch'])
+ del(pre_conf['git-branch'])
+
+ ## Prerequisit steps needs to NOT be executed in arch-chroot.
+ ## Mainly because there's no root structure to chroot into.
+ ## But partly because some configurations need to be done against the live CD.
+ ## (For instance, modifying mirrors are done on LiveCD and replicated intwards)
+ for title in pre_conf:
+ print('[N] Network prerequisit step: {}'.format(title))
+ if args['rerun'] and args['rerun'] != title and not rerun:
+ continue
+ else:
+ rerun = True
+
+ for command in pre_conf[title]:
+ raw_command = command
+ opts = pre_conf[title][raw_command] if type(pre_conf[title][raw_command]) in (dict, oDict) else {}
+ if len(opts):
+ if 'pass-args' in opts or 'format' in opts:
+ command = command.format(**args)
+ ## FIXME: Instead of deleting the two options
+ ## in order to mute command output further down,
+ ## check for a 'debug' flag per command and delete these two
+ if 'pass-args' in opts:
+ del(opts['pass-args'])
+ elif 'format' in opts:
+ del(opts['format'])
+ elif 'debug' in opts and opts['debug']:
+ print('[N] Complete command-string: '.format(command))
+ else:
+ print('[-] Options: {}'.format(opts))
+
+ #print('[N] Command: {} ({})'.format(raw_command, opts))
+ o = b''.join(sys_command('{c}'.format(c=command), opts))
+ if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in b''.join(o):
+ print('[W] Prerequisit step failed: {}'.format(b''.join(o).decode('UTF-8')))
+ #print(o)
+
+ if not args['rerun'] or rerun:
+ print('[N] Straping in packages.')
+ base_return_code = strap_in_base() # TODO: check return here? we return based off pacstrap exit code.. Never tired it tho.
+ else:
+ base_return_code = None
+
+ if not os.path.isdir('/mnt/etc'): # TODO: This might not be the most long term stable thing to rely on...
+ print('[E] Failed to strap in packages', o)
+ exit(1)
+
+ if not args['rerun'] or rerun:
+ print('[N] Configuring base system.')
+ configure_base_system()
+ ## WORKAROUND: https://github.com/systemd/systemd/issues/13603#issuecomment-552246188
+ print('[N] Setting up bootloader.')
+ setup_bootloader()
+
+ if args['aur-support']:
+ print('[N] AUR support demanded, building "yay" before running POST steps.')
+ add_AUR_support()
+ print('[N] AUR support added. use "yay -Syy --noconfirm <package>" to deploy in POST.')
+
+ print('[N] Running post installation steps.')
+ run_post_install_steps()
+ time.sleep(2)
+
+ if args['aur-support'] and not args['aur-keep']:
+ o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
+ o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
+
+ if args['phone-home']:
+ phone_home(args['phone-home'])
## == Passwords
# o = sys_command('arch-chroot /mnt usermod --password {} root'.format(args['password']))
diff --git a/test_archinstall.py b/test_archinstall.py
new file mode 100644
index 00000000..30bc76e7
--- /dev/null
+++ b/test_archinstall.py
@@ -0,0 +1,14 @@
+import json
+import archinstall
+
+archinstall.update_drive_list(emulate=False)
+archinstall.setup_args_defaults(archinstall.args, interactive=False)
+#for drive in archinstall.harddrives:
+# print(drive, archinstall.human_disk_info(drive))
+
+instructions = archinstall.load_automatic_instructions(emulate=False)
+profile_instructions = archinstall.get_instructions('workstation', emulate=False)
+profile_instructions = archinstall.merge_in_includes(profile_instructions, emulate=False)
+archinstall.args['password'] = 'test'
+
+print(json.dumps(archinstall.args, indent=4)) \ No newline at end of file