Skip to content

Python binding

The Python binding exposes Knocker’s shared SQLite extension contract with Python idioms for endpoints, handlers, operators, and retention.

webhooks = knocker.open(
"knocker.db",
queue_name="knocker.events",
visibility_timeout_s=60,
max_attempts=3,
max_readers=8,
)

For hot paths, prefer one long-lived knocker.open(...) per process and reuse it for ingress plus local workers. Opening many independent Knocker handles against the same SQLite file is supported, but it is a degraded contention mode rather than the normal performance posture.

Simpler endpoint-local shape:

stripe = webhooks.endpoint(
"stripe",
path="/webhooks/stripe",
provider="stripe",
secrets=["whsec_123"],
provider_options={"tolerance_s": 300},
)
@stripe.handle("checkout.session.completed")
def handle_checkout(event, tx):
...

Equivalent explicit shape:

webhooks.add_endpoint(
name="stripe",
path="/webhooks/stripe",
provider="stripe",
secrets=["whsec_123"],
provider_options={"tolerance_s": 300},
)
@webhooks.handle(endpoint="stripe", event_type="checkout.session.completed")
def handle_checkout(event, tx):
...

provider accepts either a curated string name ("stripe", "github", "shopify", "slack", "postmark", "resend", "paddle", "lemon-squeezy", "standard-webhooks", "clerk", "twilio", "sendgrid", "linear", "meta", "discord", "zendesk", "intercom", "hubspot", "token-header", "bearer-token", "basic-auth") or a knocker.Provider instance for app-local / community providers. Curated string names are reserved for built-ins; non-curated providers should pass an instance. Unknown string names raise ValueError at add_endpoint(...) time. Providers that require secrets reject missing/None/empty secrets=... at registration time. provider_options is schema-checked; unknown keys raise ValueError.

App-local and community providers should pass a Provider instance directly to add_endpoint(...):

import knocker
class AcmeProvider(knocker.Provider):
name = "acme"
version = "1.0.0"
requires_secrets = True
def verify(self, request, *, secrets, options):
return knocker.ProviderResult.accept(
provider_delivery_id=request.header("x-acme-delivery"),
event_type=request.header("x-acme-event"),
)
webhooks.add_endpoint(
name="acme",
path="/webhooks/acme",
provider=AcmeProvider(),
secrets=["acme-secret"],
)
versions = webhooks.provider_versions()
# {"stripe": "1.0.0", "github": "1.0.0", "shopify": "1.0.0", ...}

provider_versions() returns a fresh dict[str, str] snapshot of the curated built-ins. Instance-path providers are scoped to one endpoint and do not appear here; there is no global string-lookup registry for non-curated providers.

String provider="..." names are reserved for curated built-ins. An instance whose .name collides with a curated name is rejected at add_endpoint(...) time. Provider.verify(...) returns a ProviderResult combining verification outcome and extracted metadata; unexpected exceptions become rejected results with a useful signature_error rather than crashing the caller.

For curated provider authoring, see Contributing a curated provider.

webhooks.queue_name # "knocker.events"

queue_name is a read-only string property exposing the configured Honker queue name. The underlying queue object is internal.

Handlers receive (event, tx). Business writes through tx commit atomically with Knocker’s event transition and queue ack. Handlers are synchronous; keep them short and DB-local.

result = stripe.receive(
body=b"...",
headers={"stripe-signature": "..."},
)

Lower-level contract entry:

result = webhooks.ingest(
endpoint="stripe",
body=b"...",
headers={},
query={},
method="POST",
event_type="checkout.session.completed",
provider_event_id="evt_1",
provider_delivery_id="dlv_1",
dedupe_key="evt_1",
signature_valid=True,
signature_error=None,
)

ingest(...) is trusted low-level ingress. It bypasses configured verification, so most applications should call receive(...).

Knocker does not expose framework adapters. Host apps read their own request body and headers, then call receive(...) from a route.

event = webhooks.get_event(42)
events = webhooks.list_events(limit=50)
delivery = webhooks.get_delivery(99)
deliveries = webhooks.list_deliveries(limit=50)
webhooks.ignore(42)
webhooks.replay(42)
webhooks.requeue(42)
webhooks.replay_delivery(99)

Accepted statuses:

  • ignore(...): received, failed, dead; no-op for already ignored
  • replay(...): handled, failed, dead, ignored
  • requeue(...): failed, dead, ignored
  • replay_delivery(...): linked delivery whose event is handled, failed, dead, or ignored

replay_delivery(...) uses the specified Delivery body for the handler call and does not rewrite the canonical Event payload.

await webhooks.run_worker(stop_event=stop, on_error=report_worker_error)
states = webhooks.worker_states()

Worker state is local and non-durable. Host apps own restart/supervision policy. The production worker claims jobs in small batches internally and drains already-claimed buffered jobs before honoring a stop signal.

summary = webhooks.prune_events(statuses=["handled", "ignored"], older_than=1700000000, limit=100)
orphans = webhooks.prune_orphan_deliveries(older_than=1700000000, limit=100)
audits = webhooks.list_prune_audits(kind="prune_events", since=1700000000, limit=50)
policy = knocker.RetentionPolicy(
interval_s=3600,
event_older_than_s=7 * 24 * 60 * 60,
orphan_deliveries_older_than_s=24 * 60 * 60,
)
await webhooks.run_retention(policy, stop_event=stop)

run_retention(...) registers recurring work through Honker Scheduler, and each retention job calls the shared core retention-pass primitive, so automated runs write the same durable prune audit rows as manual calls. Multiple processes may run retention workers against the same SQLite file without duplicate prune runs; one process/instance should still be treated as the source of truth for retention configuration.

IngestResult

  • delivery_id: int
  • event_id: int | None
  • duplicate: bool
  • status_code: int

Event

  • id
  • endpoint
  • event_type
  • provider_event_id
  • provider_delivery_id
  • dedupe_key
  • status
  • attempt_count
  • headers
  • query
  • body
  • received_at
  • handled_at
  • last_error

Delivery

  • id
  • event_id
  • endpoint
  • event_type
  • provider_event_id
  • provider_delivery_id
  • dedupe_key
  • method
  • headers
  • query
  • body
  • received_at
  • signature_valid
  • signature_error

PruneAudit

  • id
  • kind: 'prune_events' or 'prune_orphan_deliveries'
  • queue_name
  • executed_at
  • events_pruned
  • deliveries_pruned
  • attempts_pruned
  • live_jobs_pruned
  • summary_json

RetentionPolicy

  • interval_s
  • event_statuses
  • event_older_than_s
  • event_limit
  • orphan_deliveries_older_than_s
  • orphan_deliveries_limit

PruneEventsResult

  • events_pruned
  • attempts_pruned
  • deliveries_pruned
  • live_jobs_pruned

PruneDeliveriesResult

  • deliveries_pruned

WorkerState

  • worker_id
  • running
  • current_event_id
  • last_error

ProviderRequest

  • method: str
  • headers: dict
  • query: dict
  • body: bytes
  • header(name, default=None) — case-insensitive lookup
  • json() — cached parse; raises ValueError on non-JSON bodies

ProviderResult

  • valid: bool
  • signature_error: str | None
  • provider_delivery_id: str | None
  • provider_event_id: str | None
  • event_type: str | None
  • ProviderResult.accept(...) and ProviderResult.reject(error, ...) constructors

Provider

  • name: str
  • version: str
  • option_keys: frozenset[str]
  • requires_secrets: bool
  • verify(request, *, secrets, options) -> ProviderResult
  • validate_options(options) — override to validate option values