Aller au contenu

Serveurs en Python⚓︎

Celui par défaut⚓︎

🐣 2022-08

La base

$ python -m http.server 8000
Serving HTTP on :: port 8000 (http://[::]:8000/) ...

Pour sortir/arrêter : Ctrl+C

Et avec Python 2 ?

Bon déjà pas de bol… mais ça reste possible :
$ python -m SimpleHTTPServer 8000

🐣 Le plus petit⚓︎

🐣 2022-08

server.py
import http.server
import socketserver
from http import HTTPStatus


class Handler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        self.send_response(HTTPStatus.OK)
        self.end_headers()
        self.wfile.write(b"Hello world")


httpd = socketserver.TCPServer(("", 8001), Handler)
httpd.serve_forever()

Dans un shell :

$ python server.py

Et dans l’autre :

$ curl -D - localhost:8001
HTTP/1.0 200 OK
Server: SimpleHTTP/0.6 Python/3.9.13
Date: Sat, 13 Aug 2022 14:50:37 GMT

Hello world

Vous devriez voir passer la requête dans votre précédent shell :

$ python server.py
127.0.0.1 - - [13/Aug/2022 10:50:37] "GET / HTTP/1.1" 200 -

🐤 Avec moins de dépendances⚓︎

🐣 2022-08

Une autre approche avec un article très didactique sur le sujet qui utilise encore moins de dépendances (mais ça demande d’en faire un peu plus aussi…) :

server-socket-only.py
import socket


def handle_request(request):
    """Handles the HTTP request."""

    headers = request.split("\n")
    filename = headers[0].split()[1]
    if filename == "/":
        filename = "/index.html"

    try:
        fin = open("htdocs" + filename)
        content = fin.read()
        fin.close()

        response = "HTTP/1.0 200 OK\n\n" + content
    except FileNotFoundError:
        response = "HTTP/1.0 404 NOT FOUND\n\nFile Not Found"

    return response


# Define socket host and port
SERVER_HOST = "0.0.0.0"
SERVER_PORT = 8080

# Create socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((SERVER_HOST, SERVER_PORT))
server_socket.listen(1)
print("Listening on port %s ..." % SERVER_PORT)

while True:
    # Wait for client connections
    client_connection, client_address = server_socket.accept()

    # Get the client request
    request = client_connection.recv(1024).decode()
    print(request)

    # Return an HTTP response
    response = handle_request(request)
    client_connection.sendall(response.encode())

    # Close connection
    client_connection.close()

# Close socket
server_socket.close()

Il est un peu plus complet aussi car il s’occupe un peu des routes pour afficher des fichiers HTML.

🐔 Plus complet⚓︎

🐣 2022-08

Le plus petit framework en Python (100 lignes !).

framework.py
import json
import re
import traceback
from dataclasses import dataclass, field
from socket import AF_INET, SHUT_WR, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from typing import Any, Callable, Dict, List, Tuple
from urllib.parse import parse_qs, urlparse


@dataclass
class Request:
    method: str
    headers: Dict[str, str]
    path: str
    query_params: List[Dict[str, List[str]]] = field(default_factory=list)
    body: Any = None


@dataclass
class Response:
    body: Any
    status_code: int = 200
    content_type: str = "text/html"


class JSONResponse:
    def __new__(cls, *args, **kwargs):
        return Response(content_type="application/json", *args, **kwargs)


class ApiException(Response, BaseException):
    pass


class MostMinimalWebFramework:
    route_table: List[Tuple[re.Pattern, Callable]] = []

    def route(self, path: str) -> Callable:
        def decorator(func: Callable):
            def __inner():
                return func()

            self.route_table.append((re.compile(path + "$"), func))
            return __inner

        return decorator

    def get_route_function(self, searched_path: str) -> Callable:
        return next(r for r in self.route_table if r[0].match(searched_path))[1]

    def request_parser(self, request_str: str) -> Request:
        request_lines = request_str.split("\r\n")
        method, url, _ = request_lines[0].split(" ")  # first line has method and url

        headers = {}
        for i, line in enumerate(request_lines[1:], 1):

            if line == "":  # under empty line, whole data is body
                try:
                    body = json.loads("".join(request_lines[i + 1 :]))
                except json.JSONDecodeError:
                    body = "".join(request_lines[i + 1 :])
                break

            j = line.find(":")  # left part of : will key, right part will be value
            headers[line[:j].upper()] = line[j + 2 :]

        url = urlparse(url)
        return Request(method, headers, url.path, parse_qs(url.query), body)

    def build_response(self, r: Response) -> str:
        body = r.body if isinstance(r.body, str) else json.dumps(r.body)
        return (
            f"HTTP/1.1 {r.status_code}\r\nContent-Type: {r.content_type}; charset=utf-8"
            f"\r\nContent-Length: {len(body)}\r\nConnection: close\r\n\r\n{body}"
        )

    def run(self, address: str, port: int):
        serversocket = socket(AF_INET, SOCK_STREAM)
        serversocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        try:
            serversocket.bind((address, port))
            serversocket.listen(5)
            while True:
                clientsocket, _ = serversocket.accept()
                request = clientsocket.recv(4096).decode()
                try:
                    parsed_req = self.request_parser(request)
                    response = self.get_route_function(parsed_req.path)(parsed_req)
                except ApiException as e:
                    response = e
                except Exception:
                    print(traceback.format_exc())
                    response = Response({"msg": "500 - server error"}, 500)
                print(response.status_code, parsed_req.method, parsed_req.path)
                clientsocket.sendall(self.build_response(response).encode())
                clientsocket.shutdown(SHUT_WR)
        finally:
            serversocket.close()


if __name__ == "__main__":

    app = MostMinimalWebFramework()

    @app.route("/")
    def f(request):
        return Response("Hello World")

    @app.route("/json-response/")
    def json_response(request):
        return JSONResponse({"msg": "Hello World"})

    @app.route("/method-handling/")
    def method_handling(request):
        if request.method == "GET":
            return Response("Your method is GET")

        elif request.method == "POST":
            return Response("Your method is POST")

    @app.route("/status-code/")
    def status_code(request):
        return Response("Different status code", status_code=202)

    @app.route("/raise-exception/")
    def exception_raising(request):
        raise ApiException({"msg": "custom_exception"}, status_code=400)

    @app.route("/body-handle/")
    def body_handling(request):
        try:
            name = request.body["name"]
        except (KeyError, TypeError):
            raise ApiException({"msg": "name field required"}, status_code=400)

        return JSONResponse({"request__name": name})

    @app.route("/query-param-handling/")
    def query_param_handling(request):
        try:
            q_parameter = request.query_params["q"][0]
        except (KeyError, TypeError):
            raise ApiException({"msg": "q query paramter required"}, status_code=400)

        return JSONResponse({"your_q_parameter": q_parameter})

    @app.route("/header-handling/")
    def header_handling(request):
        try:
            token = request.headers["X-TOKEN"]
        except (KeyError, TypeError):
            raise ApiException({"msg": "Un authorized"}, status_code=403)

        return Response(f"your token {token}")

    @app.route("/user/[^/]*/posts")
    def varialbe_path(request):
        user_id = request.path[len("/user/") : -len("/posts")]
        return Response(f"posts for {user_id}", status_code=201)

    @app.route("/.*")
    def func_404(request):
        return Response("404", status_code=404)

    app.run("0.0.0.0", 8080)

Avec des exemples d’usages ensuite dans le fichier pour les routes, etc.

Pour aller plus loin/différemment⚓︎


Dernière mise à jour: 2022-11-06