chore: automatic commit 2025-04-30 12:48

This commit is contained in:
2025-04-30 12:48:06 +02:00
parent f69356473b
commit e4ab1e1bb5
5284 changed files with 868438 additions and 0 deletions

View File

@@ -0,0 +1,96 @@
from wtforms.fields import HiddenField
from wtforms.validators import ValidationError
__all__ = ("CSRFTokenField", "CSRF")
class CSRFTokenField(HiddenField):
"""
A subclass of HiddenField designed for sending the CSRF token that is used
for most CSRF protection schemes.
Notably different from a normal field, this field always renders the
current token regardless of the submitted value, and also will not be
populated over to object data via populate_obj
"""
current_token = None
def __init__(self, *args, **kw):
self.csrf_impl = kw.pop("csrf_impl")
super().__init__(*args, **kw)
def _value(self):
"""
We want to always return the current token on render, regardless of
whether a good or bad token was passed.
"""
return self.current_token
def populate_obj(self, *args):
"""
Don't populate objects with the CSRF token
"""
pass
def pre_validate(self, form):
"""
Handle validation of this token field.
"""
self.csrf_impl.validate_csrf_token(form, self)
def process(self, *args, **kwargs):
super().process(*args, **kwargs)
self.current_token = self.csrf_impl.generate_csrf_token(self)
class CSRF:
field_class = CSRFTokenField
def setup_form(self, form):
"""
Receive the form we're attached to and set up fields.
The default implementation creates a single field of
type :attr:`field_class` with name taken from the
``csrf_field_name`` of the class meta.
:param form:
The form instance we're attaching to.
:return:
A sequence of `(field_name, unbound_field)` 2-tuples which
are unbound fields to be added to the form.
"""
meta = form.meta
field_name = meta.csrf_field_name
unbound_field = self.field_class(label="CSRF Token", csrf_impl=self)
return [(field_name, unbound_field)]
def generate_csrf_token(self, csrf_token_field):
"""
Implementations must override this to provide a method with which one
can get a CSRF token for this form.
A CSRF token is usually a string that is generated deterministically
based on some sort of user data, though it can be anything which you
can validate on a subsequent request.
:param csrf_token_field:
The field which is being used for CSRF.
:return:
A generated CSRF string.
"""
raise NotImplementedError()
def validate_csrf_token(self, form, field):
"""
Override this method to provide custom CSRF validation logic.
The default CSRF validation logic simply checks if the recently
generated token equals the one we received as formdata.
:param form: The form which has this CSRF token.
:param field: The CSRF token field.
"""
if field.current_token != field.data:
raise ValidationError(field.gettext("Invalid CSRF Token."))

View File

@@ -0,0 +1,93 @@
"""
A provided CSRF implementation which puts CSRF data in a session.
This can be used fairly comfortably with many `request.session` type
objects, including the Werkzeug/Flask session store, Django sessions, and
potentially other similar objects which use a dict-like API for storing
session keys.
The basic concept is a randomly generated value is stored in the user's
session, and an hmac-sha1 of it (along with an optional expiration time,
for extra security) is used as the value of the csrf_token. If this token
validates with the hmac of the random value + expiration time, and the
expiration time is not passed, the CSRF validation will pass.
"""
import hmac
import os
from datetime import datetime
from datetime import timedelta
from hashlib import sha1
from ..validators import ValidationError
from .core import CSRF
__all__ = ("SessionCSRF",)
class SessionCSRF(CSRF):
TIME_FORMAT = "%Y%m%d%H%M%S"
def setup_form(self, form):
self.form_meta = form.meta
return super().setup_form(form)
def generate_csrf_token(self, csrf_token_field):
meta = self.form_meta
if meta.csrf_secret is None:
raise Exception(
"must set `csrf_secret` on class Meta for SessionCSRF to work"
)
if meta.csrf_context is None:
raise TypeError("Must provide a session-like object as csrf context")
session = self.session
if "csrf" not in session:
session["csrf"] = sha1(os.urandom(64)).hexdigest()
if self.time_limit:
expires = (self.now() + self.time_limit).strftime(self.TIME_FORMAT)
csrf_build = "{}{}".format(session["csrf"], expires)
else:
expires = ""
csrf_build = session["csrf"]
hmac_csrf = hmac.new(
meta.csrf_secret, csrf_build.encode("utf8"), digestmod=sha1
)
return f"{expires}##{hmac_csrf.hexdigest()}"
def validate_csrf_token(self, form, field):
meta = self.form_meta
if not field.data or "##" not in field.data:
raise ValidationError(field.gettext("CSRF token missing."))
expires, hmac_csrf = field.data.split("##", 1)
check_val = (self.session["csrf"] + expires).encode("utf8")
hmac_compare = hmac.new(meta.csrf_secret, check_val, digestmod=sha1)
if hmac_compare.hexdigest() != hmac_csrf:
raise ValidationError(field.gettext("CSRF failed."))
if self.time_limit:
now_formatted = self.now().strftime(self.TIME_FORMAT)
if now_formatted > expires:
raise ValidationError(field.gettext("CSRF token expired."))
def now(self):
"""
Get the current time. Used for test mocking/overriding mainly.
"""
return datetime.now()
@property
def time_limit(self):
return getattr(self.form_meta, "csrf_time_limit", timedelta(minutes=30))
@property
def session(self):
return getattr(
self.form_meta.csrf_context, "session", self.form_meta.csrf_context
)