Python implementation of Bluesky PDS and AT Protocol, including repo, MST, and sync XRPC methods
Python implementation of Bluesky PDS and AT Protocol, including data repository, Merkle search tree, and XRPC methods.
You can build your own PDS on top of arroba with just a few lines of Python and run it in any WSGI server. You can build a more involved PDS with custom logic and behavior. Or you can build a different ATProto service, eg an AppView, relay (née BGS), or something entirely new!
Install from PyPI with pip install arroba.
Arroba is the Spanish word for the @ character ("at sign").
License: This project is placed in the public domain. You may also use it under the CC0 License.
Here's minimal example code for a multi-repo PDS on top of arroba and Flask:
from flask import Flask
from google.cloud import ndb
from lexrpc.flask_server import init_flask
from arroba import server
from arroba.datastore_storage import DatastoreStorage
from arroba.firehose import send_events
# for Google Cloud Datastore
ndb_client = ndb.Client()
server.storage = DatastoreStorage(ndb_client=ndb_client)
server.repo.callback = lambda _: send_events() # to subscribeRepos
app = Flask('my-pds')
init_flask(server.server, app)
def ndb_context_middleware(wsgi_app):
def wrapper(environ, start_response):
with ndb_client.context():
return wsgi_app(environ, start_response)
return wrapper
app.wsgi_app = ndb_context_middleware(app.wsgi_app)
See app.py for a more comprehensive example, including a CORS handler for OPTIONS preflight requests and a catch-all app.bsky.* XRPC handler that proxies requests to the AppView.
Arroba consists of these parts:
Storage abstract base classDatastoreStorage (uses Google Cloud Datastore)did: create and resolve did:plcs, did:webs, and domain handlesdiff: find the deterministic minimal difference between two MSTsutil: miscellaneous utilities for TIDs, AT URIs, signing and verifying signatures, generating JWTs, encoding/decoding, and moreConfigure arroba with these environment variables:
APPVIEW_HOST, default api.bsky-sandbox.devRELAY_HOST, default bgs.bsky-sandbox.devPLC_HOST, default plc.bsky-sandbox.devPDS_HOST, where you're running your PDSOptional, only used in com.atproto.repo, .server, and .sync XRPC handlers:
REPO_TOKEN, static token to use as both accessJwt and refreshJwt, defaults to contents of repo_token file. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented.ROLLBACK_WINDOW, number of events to serve in the subscribeRepos rollback window, as an integer. Defaults to 50k.PRELOAD_WINDOW, number of events to preload into the subscribeRepos rollback window at startup, as an integer. Defaults to 4k.SUBSCRIBE_REPOS_BATCH_DELAY, minimum time to wait between datastore queries in com.atproto.sync.subscribeRepos, in seconds, as a float. Defaults to 0 if unset.BLOB_MAX_BYTES, maximum allowed size of blobs, in bytes. Defaults to 100MB.BLOB_REFETCH_DAYS, how often in days to refetch remote URL-based blobs datastore to check that they're still serving. May be integer or float. Defaults to 7. These re-fetches happen on demand, during com.atproto.sync.getBlob requests.BLOB_REFETCH_TYPES, comma-separated list of MIME types (without subtypes, ie the part after /) to refetch blobs for. Defaults to image.Breaking changes:
Repo.create_from_commit has been removed; all repos should now be created with Repo.create.Repo.apply_writes, format_commit, apply_commit, and writes_to_commit_ops. Use the new Storage.commit method instead.Non-breaking changes:
AtpRemoteBlob:
repos property to track which repos have which blobs.did:
rollback_plc function.xrpc_repo:
describe_repo: add app.bsky.graph.listblock.xrpc_sync:
get_blob: periodically check remote blobs with HTTP GET requests to see if they're still serving.get_record: include MST covering proof blocks for record.listBlobs.subscribeRepos/firehose: handle uncaught exceptions and continue serving (snarfed/bridgy-fed#2150).xrpc_sync.subscribe_repos now includes covering proof blocks and new prev and prevData fields.MST:
cids_for_path, add_covering_proofs methods.Repo:
apply_writes: skip no-op update operations where the new record value is the same as the existing stored record. (No-op updates are evidently illegal in ATProto.)#sync event when a new repo is created.Storage:
read_events_by_seq: always include the MST root block in every commit event.DatastoreStorage:
AtpRemoteBlob.get_or_create: truncate URLs to 1500 characters.AtpRemoteBlob.generate_private_key method.write_blocks to batch get and put.did:
resolve_handle: support did:webs in the HTTPS /.well-known/atproto-did method.xrpc_sync:
subscribeRepos to unify event stream generation across all subscribers. This significantly improves scalability and reduces CPU and I/O to near constant, with minimal additional overhead per subscriber (#52).Breaking changes:
repo:
apply_commit, apply_writes: raise an exception if the repo is inactive.storage:
create_repo: remove signing_key and rotation_key kwargs, read them from input repo instead.load_repo: don't raise an exception if the repo is tombstoned.datastore_storage:
AtpBlock.decoded in the datastore, it's now just an in memory @property.util:
TombstonedRepo to InactiveRepo.Non-breaking changes:
datastore_storage:
DatastoreStorage:
ndb_context_kwargs constructor kwarg.apply_commit: handle deactivated repos.create_repo: propagate Repo.status into AtpRepo.AtpRemoteBlob:
get_or_create: drop datastore transaction.width and height properties, populated for images and videos, to be used in image/video embed aspectRatio (snarfed/bridgy-fed#1571).ValidationError on videos over 3 minutes.did:
get_signing_key, get_handle functions.create_plc: remove trailing slash from services.atproto_pds.endpoint.storage:
Storage: add new write_blocks method, implement in MemoryStorage and DatastoreStorage.xrpc_repo:
describe_server: include all app.bsky collections and others like chat.bsky.actor.declaration; fetch and include DID doc.com.atproto.repo.importRepo.xrpc_sync:
get_blob:
Cache-Control to cache for 1h.list_repos:
rev, not integer sequence number.null if the account is active.Breaking changes:
storage:
Storage.write to return Block instead of CID.Non-breaking changes:
did:
update_plc method.create_plc: add new also_known_as kwarg.resolve_handle: drop Content-Type: text/plain requirement for HTTPS method.mst:
start kwarg to load_all.repo:
subscribeRepos when creating new repos.storage:
deactivate_repo, activate_repo, and write_event methods.repo kwarg to read_blocks_by_seq and read_events_by_seq to limit returned results to a single repo.datastore_storage:
max_size and accept_types kwarg to AtpRemoteBlob.get_or_create for the blob's maxSize and accept parameters in its lexicon. If the fetched file doesn't satisfy those constraints, raises lexrpc.ValidationError.DatastoreStorage.read_blocks_by_seq: use strong consistency for datastore query. May fix occasional AssertionError when serving subscribeRepos.xrpc_sync:
getBlob from returning HTTP 302 to 301.since param in getRepo.subscribeRepos: wait up to 60s on a skipped sequence number before giving up and emitting it as a gap.util:
service_jwt: add new **claims parameter for additional JWT claims, eg lxm.Breaking changes:
datastore_storage:
DatastoreStorage: add new required ndb_client kwarg to constructor, used to get new context in lexrpc websocket subscription handlers that run server methods like subscribeRepos in separate threads (snarfed/lexrpc#8).DatastoreStorage.read_blocks_by_seq: if the ndb context gets closed while we're still running, log a warning and return. (This can happen in eg flask_server if the websocket client disconnects early.)AtpRemoteBlob: if the blob URL doesn't return the Content-Type header, infer type from the URL, or fall back to application/octet-stream (bridgy-fed#1073).did:
resolve_plc, resolve_web, and resolve_handle for 6h, up to 5000 total results per call.storage: rename Storage.read_commits_by_seq to read_events_by_seq for new account tombstone support.xrpc_sync: rename send_new_commits to send_events, ditto.xrpc_repo: stop requiring auth for read methods: getRecord, listRecords, describeRepo.Non-breaking changes:
did:
HANDLE_RE regexp for handle validation.storage:
Storage.tombstone_repo method, implemented in MemoryStorage and DatastoreStorage. Used to delete accounts. (bridgy-fed#783)Storage.load_repos method, implemented in MemoryStorage and DatastoreStorage. Used for com.atproto.sync.listRepos.util:
service_jwt: add optional aud kwarg.xrpc_sync:
subscribeRepos:
ROLLBACK_WINDOW environment variable to limit size of rollback window. Defaults to no limit.time instead of the current time (snarfed/bridgy-fed#1015).getRepo queries with the since parameter. since still isn't actually implemented, but we now serve the entire repo instead of returning an error.getRepoStatus method.listRepos method.getRepo bug fix: include the repo head commit block.xrpc_repo:
getRecord: encoded returned records correctly as ATProto-flavored DAG-JSON.xrpc_*: return RepoNotFound and RepoDeactivated errors when appropriate (snarfed/bridgy-fed#1083).at:// URIs, commit revs, etc. Before, we were using the integer UNIX timestamp directly, which happened to be the same 13 character length. Oops.BGS_HOST environment variable to RELAY_HOST. BGS_HOST is still supported for backward compatibility.datastore_storage:
DatastoreStorage.last_seq, handle new NSID.AtpRemoteBlob class for storing "remote" blobs, available at public HTTP URLs, that we don't store ourselves.did:
create_plc: strip padding from genesis operation signature (for did-method-plc#54, atproto#1839).resolve_handle: return None on bad domain, eg .foo.com.resolve_handle bug fix: handle charset specifier in HTTPS method response Content-Type.util:
new_key: add seed kwarg to allow deterministic key generation.xrpc_repo:
getRecord: try to load record locally first; if not available, forward to AppView.xrpc_sync:
getBlob, right now only based on "remote" blobs stored in AtpRemoteBlobs in datastore storage.subscribeRepos sequence number is reused as the new rev field in commits. (Discussion.).did module with utilities to create and resolve did:plcs and resolve did:webs.util.service_jwt function that generates ATProto inter-service JWTs.Repo:
signing_key/rotation_key attributes. Generate store, and load both in datastore_storage.format_init_commit, migrate existing calls to format_commit.Storage:
read_from_seq => read_blocks_by_seq (and in MemoryStorage and DatastoreStorage), add new read_commits_by_seq method.load_repo did/handle kwargs into did_or_handle.subscribeRepos check storage for all new commits every time it wakes up.
xrpc_sync.enqueue_commit with new send_new_commits function that takes no parameters.app.bsky/com.atproto lexicons, use lexrpc's instead.Big milestone: arroba is successfully federating with the ATProto sandbox! See app.py for the minimal demo code needed to wrap arroba in a fully functional PDS.
com.atproto XRPC methods needed to federate with sandbox, including most of repo and sync.
subscribeRepos server side over websocket.Implement repo and commit chain in new Repo class, including pluggable storage. This completes the first pass at all PDS data structures. Next release will include initial implementations of the com.atproto.sync.* XRPC methods.
Initial release! Still very in progress. MST, Walker, and Diff classes are mostly complete and working. Repo, commits, and sync XRPC methods are still in progress.
Here's how to package, test, and ship a new release.
git checkout main
git pull
source local/bin/activate.csh
python -m unittest discover
python -m unittest arroba.tests.mst_test_suite # more extensive, slower tests (deliberately excluded from autodiscovery)
pyproject.toml and docs/conf.py. git grep the old version number to make sure it only appears in the changelog. Change the current changelog entry in README.md for this new version from unreleased to the current date.docs/source/. Then run ./docs/build.sh. Check that the generated HTML looks fine by opening docs/_build/html/index.html and looking around.setenv ver X.Y
git commit -am "release v$ver"
python -m build
twine upload -r pypitest dist/arroba-$ver*
cd /tmp
python -m venv local
source local/bin/activate.csh
# make sure we force pip to use the uploaded version
pip uninstall arroba
pip install --upgrade pip
pip install -i https://test.pypi.org/simple --extra-index-url https://pypi.org/simple arroba==$ver
python
from arroba import did
did.resolve_handle('snarfed.org')
### Notable changes on the second line, then copy and paste this version's changelog contents below it.
git tag -a v$ver --cleanup=verbatim
git push && git push --tags
vX.Y in the Tag version box. Leave Release title empty. Copy ### Notable changes and the changelog contents into the description text box.twine upload dist/arroba-$ver*
🌉 A bridge between decentralized social networks
💬 The social web translator
🏎️ Fast Python library to work with IPLD: DAG-CBOR, CID, CAR, multibase
The AT Protocol (🦋 Bluesky) SDK for Python 🐍
A collection of example projects and scripts for atproto development.
A script for auto-deleting Bluesky posts
Your Brand Here!
50K+ engaged viewers every month
Limited spots available!
📧 Contact us via email🦋 Contact us on Bluesky