Module psdi_data_conversion.gui.authentication

auth.py

This module contains the OpenID Connect and JSON Web Token handling.

Functions

def get_authenticated_user()
Expand source code
def get_authenticated_user():

    auth_token_string = request.cookies.get('auth_token')

    if auth_token_string is None:
        return None

    auth_token = unquote(auth_token_string)

    d_unverified_header: dict[str, dict | str] = jwt.get_unverified_header(auth_token)

    kid: str = d_unverified_header['kid']

    d_authenticated_user: dict[str, dict | str] | None = None

    if kid in d_all_user_keys:

        d_user_key: dict[str, dict | str] = d_all_user_keys[kid]

        now = datetime.now(timezone.utc)
        elapsed = now - d_user_key["last_used"]
        timeout = get_env().session_timeout_seconds

        if timeout > 0 and elapsed.seconds > timeout:

            del d_all_user_keys[kid]

        else:

            try:

                d_authenticated_user = d_user_key["access_token"]
                d_user_key["last_used"] = datetime.now(timezone.utc)

            except jwt.InvalidTokenError as e:

                print(f"Failed to verify session token: {e}", file=sys.stderr)

    return d_authenticated_user
def get_keycloak_public_key()
Expand source code
@ttl_cache(ttl=60)
def get_keycloak_public_key():
    # Get JSON Web Key Set from Keycloak so we can verify tokens.
    # This needs to run periodically.

    env = get_env()

    jwks_url = f"{env.keycloak_url}/realms/{env.keycloak_realm}/protocol/openid-connect/certs"
    d_jwks: dict[str, dict | str] = requests.get(jwks_url).json()
    public_keys: dict[str, str] = {}

    for d_key in d_jwks['keys']:
        public_keys[d_key['kid']] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(d_key))

    return public_keys
def get_login_url()
Expand source code
def get_login_url():

    env = get_env()

    query = {
        'client_id': env.keycloak_client_id,
        'redirect_uri': env.keycloak_redirect_url,
        'response_type': 'code',
        'scope': 'openid',
    }

    return f"{env.keycloak_url}/realms/{env.keycloak_realm}/protocol/openid-connect/auth?{urlencode(query)}"
def get_logout_url()
Expand source code
def get_logout_url():

    return "/logout"
def init_authentication(app: flask.app.Flask)
Expand source code
def init_authentication(app: Flask):
    """Connect the provided Flask app to each of the post methods
    """

    app.route('/oidc_callback', methods=['GET'])(oidc_callback)
    app.route('/logout', methods=['GET'])(logout)

Connect the provided Flask app to each of the post methods

def logout()
Expand source code
def logout():

    d_authenticated_user: dict[str, dict | str] | None = get_authenticated_user()

    if d_authenticated_user is None:

        return redirect("/")

    sid = d_authenticated_user["sid"]

    # Iterate over keys in a shallow copy so the iteration isn't disrupted when we delete a key
    for kid in d_all_user_keys.copy():

        if d_all_user_keys[kid]["access_token"]["sid"] == sid:

            del d_all_user_keys[kid]

    return redirect("/")
def oidc_callback()
Expand source code
def oidc_callback():
    # Get the _code_ parameter to use for Keycloak communication

    code = request.args.get('code')

    if not code:
        abort(400)

    env = get_env()

    # Make request to Keycloak for a id / access token
    keycloak_data = {
        "grant_type": "authorization_code",
        "code": code,
        "client_id": env.keycloak_client_id,
        "client_secret": env.get_keycloak_secret(),
        "redirect_uri": env.keycloak_redirect_url,
        "scope": "openid",
    }

    keycloak_url = f"{env.keycloak_url}/realms/{env.keycloak_realm}/protocol/openid-connect/token"

    d_token_data: dict[str, dict | str] = requests.post(keycloak_url, data=keycloak_data).json()
    access_token: str = d_token_data.get("access_token")

    if not access_token:
        print(f"ERROR: Access token not granted: {repr(d_token_data)}", file=sys.stderr)
        abort(400)

    try:
        # Verify and decode the access token
        d_header: dict[str, Any] = jwt.get_unverified_header(access_token)
        public_key = get_keycloak_public_key()[d_header['kid']]

        decoded_access_token: str = jwt.decode(
            access_token,
            key=public_key,
            audience="account",
            algorithms=['RS256']
        )

        user_public_key_string = unquote(request.cookies['public_key'])

        d_user_public_key: dict[str, dict | str] = json.loads(user_public_key_string)

        kid: str = d_user_public_key["kid"]

        d_all_user_keys[kid] = {
            "last_used": datetime.now(timezone.utc),
            "access_token": decoded_access_token,
            "public_key": jwt.PyJWK.from_json(user_public_key_string)
        }

        return redirect("/")

    except jwt.InvalidTokenError as e:
        print(f"ERROR: Failed to verify access token: {e}", file=sys.stderr)
        abort(400)