Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/test/pacman/pmserve.py
blob: 47e041d066ebfa137b77e11cd0c6f5eb8560cd18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#  Copyright (c) 2020-2021 Pacman Development Team <pacman-dev@archlinux.org>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

import http
import http.server
import sys
import re

class pmHTTPServer(http.server.ThreadingHTTPServer):
    pass

class pmHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
    """BaseHTTPRequestHandler subclass with helper methods and common setup"""

    logfile = sys.stderr

    def respond(self, response, headers={}, code=200):
        self.protocol_version = "HTTP/1.1"
        self.send_response(code)
        for header, value in headers.items():
            self.send_header(header, value)
        self.end_headers()
        self.wfile.write(response)

    def parse_range_bytes(self, text):
        parser = re.compile(r'^bytes=(\d+)-(\d+)?$')
        if m := parser.match(text):
            return map(lambda d: None if d is None else int(d), m.groups())
        else:
            raise ValueError("Unrecognized Range value")

    def respond_bytes(self, response, headers={}, code=200):
        headers = headers.copy()
        if code == 200 and self.headers['Range']:
            (start, end) = self.parse_range_bytes(self.headers['Range'])
            code = 206
            response = response[start:end]
            headers.setdefault('Content-Range', 'bytes */%s' % (len(response)))
        headers.setdefault('Content-Type', "application/octet-stream")
        headers.setdefault('Content-Length', str(len(response)))
        self.respond(response, headers, code)

    def respond_string(self, response, headers={}, code=200):
        headers = headers.copy()
        headers.setdefault('Content-Type', 'text/plain; charset=utf-8')
        self.respond_bytes(response.encode('UTF-8'), headers, code)

    def log_message(self, format, *args):
        if callable(self.logfile):
            self.logfile = self.logfile()
        self.logfile.write("%s - - [%s] %s\n" %
                (self.address_string(),
                self.log_date_time_string(),
                format%args))

class pmStringHTTPRequestHandler(pmHTTPRequestHandler):
    """pmHTTPRequestHandler subclass to respond with simple string messages"""

    responses = dict()

    def do_GET(self):
        response = self.responses.get(self.path, self.responses.get(''))
        if response is not None:
            if isinstance(response, dict):
                self.respond_string(
                        response.get('body', ''),
                        headers=response.get('headers', {}),
                        code=response.get('code', 200))
            else:
                self.respond_string(response)
        else:
            self.send_error(http.HTTPStatus.NOT_FOUND);