Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/test/pacman/pmserve.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/pacman/pmserve.py')
-rw-r--r--test/pacman/pmserve.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/test/pacman/pmserve.py b/test/pacman/pmserve.py
new file mode 100644
index 00000000..0ca34093
--- /dev/null
+++ b/test/pacman/pmserve.py
@@ -0,0 +1,84 @@
+# Copyright (c) 2020 Pacman Development Team <pacman-dev@archlinux.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import http
+import http.server
+import sys
+import re
+
+class pmHTTPServer(http.server.ThreadingHTTPServer):
+ pass
+
+class pmHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
+ """BaseHTTPRequestHandler subclass with helper methods and common setup"""
+
+ logfile = sys.stderr
+
+ def respond(self, response, headers={}, code=200):
+ self.protocol_version = "HTTP/1.1"
+ self.send_response(code)
+ for header, value in headers.items():
+ self.send_header(header, value)
+ self.end_headers()
+ self.wfile.write(response)
+
+ def parse_range_bytes(self, text):
+ parser = re.compile(r'^bytes=(\d+)-(\d+)?$')
+ if m := parser.match(text):
+ return map(lambda d: None if d is None else int(d), m.groups())
+ else:
+ raise ValueError("Unrecognized Range value")
+
+ def respond_bytes(self, response, headers={}, code=200):
+ headers = headers.copy()
+ if code == 200 and self.headers['Range']:
+ (start, end) = self.parse_range_bytes(self.headers['Range'])
+ code = 206
+ response = response[start:end]
+ headers.setdefault('Content-Range', 'bytes */%s' % (len(response)))
+ headers.setdefault('Content-Type', "application/octet-stream")
+ headers.setdefault('Content-Length', str(len(response)))
+ self.respond(response, headers, code)
+
+ def respond_string(self, response, headers={}, code=200):
+ headers = headers.copy()
+ headers.setdefault('Content-Type', 'text/plain; charset=utf-8')
+ self.respond_bytes(response.encode('UTF-8'), headers, code)
+
+ def log_message(self, format, *args):
+ if callable(self.logfile):
+ self.logfile = self.logfile()
+ self.logfile.write("%s - - [%s] %s\n" %
+ (self.address_string(),
+ self.log_date_time_string(),
+ format%args))
+
+class pmStringHTTPRequestHandler(pmHTTPRequestHandler):
+ """pmHTTPRequestHandler subclass to respond with simple string messages"""
+
+ responses = dict()
+
+ def do_GET(self):
+ response = self.responses.get(self.path, self.responses.get(''))
+ if response is not None:
+ if isinstance(response, dict):
+ self.respond_string(
+ response.get('body', ''),
+ headers=response.get('headers', {}),
+ code=response.get('code', 200))
+ else:
+ self.respond_string(response)
+ else:
+ self.send_error(http.HTTPStatus.NOT_FOUND);