diff --git a/.ci/scripts/calculate_jobs.py b/.ci/scripts/calculate_jobs.py index b41ec0b6e2dc..c2c18b48e3d7 100755 --- a/.ci/scripts/calculate_jobs.py +++ b/.ci/scripts/calculate_jobs.py @@ -29,11 +29,12 @@ def set_output(key: str, value: str): # First calculate the various trial jobs. # -# For each type of test we only run on Py3.7 on PRs +# For PRs, we only run each type of test with the oldest Python version supported (which +# is Python 3.8 right now) trial_sqlite_tests = [ { - "python-version": "3.7", + "python-version": "3.8", "database": "sqlite", "extras": "all", } @@ -46,13 +47,13 @@ def set_output(key: str, value: str): "database": "sqlite", "extras": "all", } - for version in ("3.8", "3.9", "3.10", "3.11") + for version in ("3.9", "3.10", "3.11") ) trial_postgres_tests = [ { - "python-version": "3.7", + "python-version": "3.8", "database": "postgres", "postgres-version": "11", "extras": "all", @@ -71,7 +72,7 @@ def set_output(key: str, value: str): trial_no_extra_tests = [ { - "python-version": "3.7", + "python-version": "3.8", "database": "sqlite", "extras": "", } diff --git a/.github/workflows/release-artifacts.yml b/.github/workflows/release-artifacts.yml index 09812004017f..f331f67d9728 100644 --- a/.github/workflows/release-artifacts.yml +++ b/.github/workflows/release-artifacts.yml @@ -144,7 +144,7 @@ jobs: - name: Only build a single wheel on PR if: startsWith(github.ref, 'refs/pull/') - run: echo "CIBW_BUILD="cp37-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV + run: echo "CIBW_BUILD="cp38-manylinux_${{ matrix.arch }}"" >> $GITHUB_ENV - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6c2298499777..0a01e8298468 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -320,7 +320,7 @@ jobs: - uses: actions/setup-python@v4 with: - python-version: '3.7' + python-version: '3.8' - name: Prepare old deps if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true' @@ -362,7 +362,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["pypy-3.7"] + python-version: ["pypy-3.8"] extras: ["all"] steps: @@ -477,7 +477,7 @@ jobs: strategy: matrix: include: - - python-version: "3.7" + - python-version: "3.8" postgres-version: "11" - python-version: "3.11" diff --git a/changelog.d/15773.feature b/changelog.d/15773.feature new file mode 100644 index 000000000000..0d77fae2dc68 --- /dev/null +++ b/changelog.d/15773.feature @@ -0,0 +1 @@ +Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`. diff --git a/changelog.d/15851.removal b/changelog.d/15851.removal new file mode 100644 index 000000000000..e08df4c1366a --- /dev/null +++ b/changelog.d/15851.removal @@ -0,0 +1 @@ +Remove support for Python 3.7. diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv index 861129ebc225..b7679924c24a 100644 --- a/docker/Dockerfile-dhvirtualenv +++ b/docker/Dockerfile-dhvirtualenv @@ -28,12 +28,12 @@ FROM docker.io/library/${distro} as builder RUN apt-get update -qq -o Acquire::Languages=none RUN env DEBIAN_FRONTEND=noninteractive apt-get install \ - -yqq --no-install-recommends \ - build-essential \ - ca-certificates \ - devscripts \ - equivs \ - wget + -yqq --no-install-recommends \ + build-essential \ + ca-certificates \ + devscripts \ + equivs \ + wget # fetch and unpack the package # We are temporarily using a fork of dh-virtualenv due to an incompatibility with Python 3.11, which ships with @@ -62,33 +62,29 @@ FROM docker.io/library/${distro} ARG distro="" ENV distro ${distro} -# Python < 3.7 assumes LANG="C" means ASCII-only and throws on printing unicode -# http://bugs.python.org/issue19846 -ENV LANG C.UTF-8 - # Install the build dependencies # # NB: keep this list in sync with the list of build-deps in debian/control # TODO: it would be nice to do that automatically. RUN apt-get update -qq -o Acquire::Languages=none \ && env DEBIAN_FRONTEND=noninteractive apt-get install \ - -yqq --no-install-recommends -o Dpkg::Options::=--force-unsafe-io \ - build-essential \ - curl \ - debhelper \ - devscripts \ - libsystemd-dev \ - lsb-release \ - pkg-config \ - python3-dev \ - python3-pip \ - python3-setuptools \ - python3-venv \ - sqlite3 \ - libpq-dev \ - libicu-dev \ - pkg-config \ - xmlsec1 + -yqq --no-install-recommends -o Dpkg::Options::=--force-unsafe-io \ + build-essential \ + curl \ + debhelper \ + devscripts \ + libsystemd-dev \ + lsb-release \ + pkg-config \ + python3-dev \ + python3-pip \ + python3-setuptools \ + python3-venv \ + sqlite3 \ + libpq-dev \ + libicu-dev \ + pkg-config \ + xmlsec1 # Install rust and ensure it's in the PATH ENV RUSTUP_HOME=/rust diff --git a/docs/setup/installation.md b/docs/setup/installation.md index 86e506a3e231..4ca8c6b69786 100644 --- a/docs/setup/installation.md +++ b/docs/setup/installation.md @@ -200,7 +200,7 @@ When following this route please make sure that the [Platform-specific prerequis System requirements: - POSIX-compliant system (tested on Linux & OS X) -- Python 3.7 or later, up to Python 3.11. +- Python 3.8 or later, up to Python 3.11. - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org If building on an uncommon architecture for which pre-built wheels are diff --git a/docs/upgrade.md b/docs/upgrade.md index 4cd38b13932a..384f4010b445 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -87,6 +87,18 @@ process, for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` + +# Upgrading to v1.88.0 + +## Minimum supported Python version + +The minimum supported Python version has been increased from v3.7 to v3.8. +You will need Python 3.8 to run Synapse v1.88.0 (due out July 18th, 2023). + +If you use current versions of the Matrix.org-distributed Debian +packages or Docker images, no action is required. + + # Upgrading to v1.86.0 ## Minimum supported Rust version diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 26d7c7900cbe..89a92c4682be 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3930,13 +3930,14 @@ federation_sender_instances: --- ### `instance_map` -When using workers this should be a map from [`worker_name`](#worker_name) to the -HTTP replication listener of the worker, if configured, and to the main process. -Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs -a HTTP replication listener, and that listener should be included in the `instance_map`. -The main process also needs an entry on the `instance_map`, and it should be listed under -`main` **if even one other worker exists**. Ensure the port matches with what is declared -inside the `listener` block for a `replication` listener. +When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP +replication listener of the worker, if configured, and to the main process. Each worker +declared under [`stream_writers`](../../workers.md#stream-writers) and +[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that +listener should be included in the `instance_map`. The main process also needs an entry +on the `instance_map`, and it should be listed under `main` **if even one other worker +exists**. Ensure the port matches with what is declared inside the `listener` block for +a `replication` listener. Example configuration: @@ -3966,6 +3967,22 @@ stream_writers: typing: worker1 ``` --- +### `outbound_federation_restricted_to` + +When using workers, you can restrict outbound federation traffic to only go through a +specific subset of workers. Any worker specified here must also be in the +[`instance_map`](#instance_map). + +```yaml +outbound_federation_restricted_to: + - federation_sender1 + - federation_sender2 +``` + +Also see the [worker +documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers) +for more info. +--- ### `run_background_tasks_on` The [worker](../../workers.md#background-tasks) that is used to run diff --git a/docs/workers.md b/docs/workers.md index 735128762a47..303e0f0e7a5a 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -531,6 +531,26 @@ the stream writer for the `presence` stream: ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ +#### Restrict outbound federation traffic to a specific set of workers + +The `outbound_federation_restricted_to` configuration is useful to make sure outbound +federation traffic only goes through a specified subset of workers. This allows you to +set more strict access controls (like a firewall) for all workers and only allow the +`federation_sender`'s to contact the outside world. + +```yaml +instance_map: + main: + host: localhost + port: 8030 + federation_sender1: + host: localhost + port: 8034 + +outbound_federation_restricted_to: + - federation_sender1 +``` + #### Background tasks There is also support for moving background tasks to a separate diff --git a/poetry.lock b/poetry.lock index 9aaf5c7de724..c62337053e70 100644 --- a/poetry.lock +++ b/poetry.lock @@ -41,9 +41,6 @@ files = [ {file = "attrs-23.1.0.tar.gz", hash = "sha256:6279836d581513a26f1bf235f9acd333bc9115683f14f7e8fae46c98fc50e015"}, ] -[package.dependencies] -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} - [package.extras] cov = ["attrs[tests]", "coverage[toml] (>=5.3)"] dev = ["attrs[docs,tests]", "pre-commit"] @@ -190,7 +187,6 @@ packaging = ">=22.0" pathspec = ">=0.9.0" platformdirs = ">=2" tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typed-ast = {version = ">=1.4.2", markers = "python_version < \"3.8\" and implementation_name == \"cpython\""} typing-extensions = {version = ">=3.10.0.0", markers = "python_version < \"3.10\""} [package.extras] @@ -412,7 +408,6 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} [[package]] name = "click-default-group" @@ -601,7 +596,6 @@ files = [ [package.dependencies] gitdb = ">=4.0.1,<5" -typing-extensions = {version = ">=3.7.4.3", markers = "python_version < \"3.8\""} [[package]] name = "hiredis" @@ -847,7 +841,6 @@ files = [ ] [package.dependencies] -typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""} zipp = ">=0.5" [package.extras] @@ -987,11 +980,9 @@ files = [ [package.dependencies] attrs = ">=17.4.0" -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""} pkgutil-resolve-name = {version = ">=1.3.10", markers = "python_version < \"3.9\""} pyrsistent = ">=0.14.0,<0.17.0 || >0.17.0,<0.17.1 || >0.17.1,<0.17.2 || >0.17.2" -typing-extensions = {version = "*", markers = "python_version < \"3.8\""} [package.extras] format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"] @@ -1199,7 +1190,6 @@ files = [ [package.dependencies] mdurl = ">=0.1,<1.0" -typing_extensions = {version = ">=3.7.4", markers = "python_version < \"3.8\""} [package.extras] benchmarking = ["psutil", "pytest", "pytest-benchmark"] @@ -1283,7 +1273,6 @@ files = [ [package.dependencies] attrs = "*" -importlib-metadata = {version = ">=1.4", markers = "python_version < \"3.8\""} [package.extras] dev = ["aiounittest", "black (==22.3.0)", "build (==0.8.0)", "flake8 (==4.0.1)", "isort (==5.9.3)", "mypy (==0.910)", "tox", "twine (==4.0.1)", "twisted"] @@ -1459,7 +1448,6 @@ files = [ [package.dependencies] mypy-extensions = ">=0.4.3" tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typed-ast = {version = ">=1.4.0,<2", markers = "python_version < \"3.8\""} typing-extensions = ">=3.10" [package.extras] @@ -1721,9 +1709,6 @@ files = [ {file = "platformdirs-3.1.1.tar.gz", hash = "sha256:024996549ee88ec1a9aa99ff7f8fc819bb59e2c3477b410d90a16d32d6e707aa"}, ] -[package.dependencies] -typing-extensions = {version = ">=4.4", markers = "python_version < \"3.8\""} - [package.extras] docs = ["furo (>=2022.12.7)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.22,!=1.23.4)"] test = ["appdirs (==1.4.4)", "covdefaults (>=2.2.2)", "pytest (>=7.2.1)", "pytest-cov (>=4)", "pytest-mock (>=3.10)"] @@ -2060,7 +2045,6 @@ files = [ [package.dependencies] cryptography = ">=3.1" defusedxml = "*" -importlib-metadata = {version = ">=1.7.0", markers = "python_version < \"3.8\""} importlib-resources = {version = "*", markers = "python_version < \"3.9\""} pyopenssl = "*" python-dateutil = "*" @@ -2410,9 +2394,7 @@ files = [ [package.dependencies] canonicaljson = ">=1.0.0" -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} pynacl = ">=0.3.0" -typing-extensions = {version = ">=3.5", markers = "python_version < \"3.8\""} unpaddedbase64 = ">=1.0.1" [package.extras] @@ -2852,39 +2834,6 @@ files = [ six = "*" twisted = "*" -[[package]] -name = "typed-ast" -version = "1.5.4" -description = "a fork of Python 2 and 3 ast modules with type comment support" -optional = false -python-versions = ">=3.6" -files = [ - {file = "typed_ast-1.5.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:669dd0c4167f6f2cd9f57041e03c3c2ebf9063d0757dc89f79ba1daa2bfca9d4"}, - {file = "typed_ast-1.5.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:211260621ab1cd7324e0798d6be953d00b74e0428382991adfddb352252f1d62"}, - {file = "typed_ast-1.5.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:267e3f78697a6c00c689c03db4876dd1efdfea2f251a5ad6555e82a26847b4ac"}, - {file = "typed_ast-1.5.4-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:c542eeda69212fa10a7ada75e668876fdec5f856cd3d06829e6aa64ad17c8dfe"}, - {file = "typed_ast-1.5.4-cp310-cp310-win_amd64.whl", hash = "sha256:a9916d2bb8865f973824fb47436fa45e1ebf2efd920f2b9f99342cb7fab93f72"}, - {file = "typed_ast-1.5.4-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:79b1e0869db7c830ba6a981d58711c88b6677506e648496b1f64ac7d15633aec"}, - {file = "typed_ast-1.5.4-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a94d55d142c9265f4ea46fab70977a1944ecae359ae867397757d836ea5a3f47"}, - {file = "typed_ast-1.5.4-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:183afdf0ec5b1b211724dfef3d2cad2d767cbefac291f24d69b00546c1837fb6"}, - {file = "typed_ast-1.5.4-cp36-cp36m-win_amd64.whl", hash = "sha256:639c5f0b21776605dd6c9dbe592d5228f021404dafd377e2b7ac046b0349b1a1"}, - {file = "typed_ast-1.5.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:cf4afcfac006ece570e32d6fa90ab74a17245b83dfd6655a6f68568098345ff6"}, - {file = "typed_ast-1.5.4-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ed855bbe3eb3715fca349c80174cfcfd699c2f9de574d40527b8429acae23a66"}, - {file = "typed_ast-1.5.4-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:6778e1b2f81dfc7bc58e4b259363b83d2e509a65198e85d5700dfae4c6c8ff1c"}, - {file = "typed_ast-1.5.4-cp37-cp37m-win_amd64.whl", hash = "sha256:0261195c2062caf107831e92a76764c81227dae162c4f75192c0d489faf751a2"}, - {file = "typed_ast-1.5.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:2efae9db7a8c05ad5547d522e7dbe62c83d838d3906a3716d1478b6c1d61388d"}, - {file = "typed_ast-1.5.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7d5d014b7daa8b0bf2eaef684295acae12b036d79f54178b92a2b6a56f92278f"}, - {file = "typed_ast-1.5.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:370788a63915e82fd6f212865a596a0fefcbb7d408bbbb13dea723d971ed8bdc"}, - {file = "typed_ast-1.5.4-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:4e964b4ff86550a7a7d56345c7864b18f403f5bd7380edf44a3c1fb4ee7ac6c6"}, - {file = "typed_ast-1.5.4-cp38-cp38-win_amd64.whl", hash = "sha256:683407d92dc953c8a7347119596f0b0e6c55eb98ebebd9b23437501b28dcbb8e"}, - {file = "typed_ast-1.5.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:4879da6c9b73443f97e731b617184a596ac1235fe91f98d279a7af36c796da35"}, - {file = "typed_ast-1.5.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3e123d878ba170397916557d31c8f589951e353cc95fb7f24f6bb69adc1a8a97"}, - {file = "typed_ast-1.5.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ebd9d7f80ccf7a82ac5f88c521115cc55d84e35bf8b446fcd7836eb6b98929a3"}, - {file = "typed_ast-1.5.4-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:98f80dee3c03455e92796b58b98ff6ca0b2a6f652120c263efdba4d6c5e58f72"}, - {file = "typed_ast-1.5.4-cp39-cp39-win_amd64.whl", hash = "sha256:0fdbcf2fef0ca421a3f5912555804296f0b0960f0418c440f5d6d3abb549f3e1"}, - {file = "typed_ast-1.5.4.tar.gz", hash = "sha256:39e21ceb7388e4bb37f4c679d72707ed46c2fbf2a5609b8b8ebc4b067d977df2"}, -] - [[package]] name = "types-bleach" version = "6.0.0.3" @@ -3293,5 +3242,5 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" -python-versions = "^3.7.1" -content-hash = "7f31754a1009d7b6c9a1bd7221a0b243ffd510f362c28f0da417aaac16757a87" +python-versions = "^3.8.0" +content-hash = "0832381cc9e7065e8d95c810d732aa031b98d55cf188719989b12d841993e62e" diff --git a/pyproject.toml b/pyproject.toml index 192a07756b6e..a6e3a935a9c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,7 +147,7 @@ synapse_review_recent_signups = "synapse._scripts.review_recent_signups:main" update_synapse_database = "synapse._scripts.update_synapse_database:main" [tool.poetry.dependencies] -python = "^3.7.1" +python = "^3.8.0" # Mandatory Dependencies # ---------------------- @@ -203,9 +203,6 @@ ijson = ">=3.1.4" matrix-common = "^1.3.0" # We need packaging.requirements.Requirement, added in 16.1. packaging = ">=16.1" -# At the time of writing, we only use functions from the version `importlib.metadata` -# which shipped in Python 3.8. This corresponds to version 1.4 of the backport. -importlib_metadata = { version = ">=1.4", python = "<3.8" } # This is the most recent version of Pydantic with available on common distros. # We are currently incompatible with >=2.0.0: (https://github.com/matrix-org/synapse/issues/15858) pydantic = "^1.7.4" diff --git a/synapse/__init__.py b/synapse/__init__.py index b97ee59f15b6..6c1801862b61 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -25,8 +25,8 @@ from synapse.util.stringutils import strtobool # Check that we're not running on an unsupported Python version. -if sys.version_info < (3, 7): - print("Synapse requires Python 3.7 or above.") +if sys.version_info < (3, 8): + print("Synapse requires Python 3.8 or above.") sys.exit(1) # Allow using the asyncio reactor via env var. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 936b1b043037..938ab40f278e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -386,6 +386,7 @@ def listen_unix( def listen_http( + hs: "HomeServer", listener_config: ListenerConfig, root_resource: Resource, version_string: str, @@ -406,6 +407,7 @@ def listen_http( version_string, max_request_body_size=max_request_body_size, reactor=reactor, + federation_agent=hs.get_federation_http_client().agent, ) if isinstance(listener_config, TCPListenerConfig): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 7406c3948cb1..dc79efcc142f 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -221,6 +221,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None: root_resource = create_resource_tree(resources, OptionsResource()) _base.listen_http( + self, listener_config, root_resource, self.version_string, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 84236ac299e0..f188c7265a50 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -139,6 +139,7 @@ def _listener_http( root_resource = OptionsResource() ports = listen_http( + self, listener_config, create_resource_tree(resources, root_resource), self.version_string, diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 38e13dd7b5e0..0b9789160cc0 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import argparse import logging -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import attr from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr @@ -148,6 +148,27 @@ class WriterLocations: ) +@attr.s(auto_attribs=True) +class OutboundFederationRestrictedTo: + """Whether we limit outbound federation to a certain set of instances. + + Attributes: + instances: optional list of instances that can make outbound federation + requests. If None then all instances can make federation requests. + locations: list of instance locations to connect to proxy via. + """ + + instances: Optional[List[str]] + locations: List[InstanceLocationConfig] = attr.Factory(list) + + def __contains__(self, instance: str) -> bool: + # It feels a bit dirty to return `True` if `instances` is `None`, but it makes + # sense in downstream usage in the sense that if + # `outbound_federation_restricted_to` is not configured, then any instance can + # talk to federation (no restrictions so always return `True`). + return self.instances is None or instance in self.instances + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -357,6 +378,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: new_option_name="update_user_directory_from_worker", ) + outbound_federation_restricted_to = config.get( + "outbound_federation_restricted_to", None + ) + self.outbound_federation_restricted_to = OutboundFederationRestrictedTo( + outbound_federation_restricted_to + ) + if outbound_federation_restricted_to: + for instance in outbound_federation_restricted_to: + if instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config." + % (instance,) + ) + self.outbound_federation_restricted_to.locations.append( + self.instance_map[instance] + ) + def _should_this_worker_perform_duty( self, config: Dict[str, Any], diff --git a/synapse/http/client.py b/synapse/http/client.py index 09ea93e10d0b..ca2cdbc6e243 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1037,7 +1037,12 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): - # stolen from https://github.com/twisted/treq/pull/49/files + # This applies to requests which don't set `Content-Length` or a + # `Transfer-Encoding` in the response because in this case the end of the + # response is indicated by the connection being closed, an event which may + # also be due to a transient network problem or other error. But since this + # behavior is expected of some servers (like YouTube), let's ignore it. + # Stolen from https://github.com/twisted/treq/pull/49/files # http://twistedmatrix.com/trac/ticket/4840 self.deferred.callback(self.length) else: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cc4e258b0fba..b00396fdc71b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer, IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -72,6 +72,7 @@ read_body_with_max_size, ) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -393,17 +394,32 @@ def __init__( if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, + outbound_federation_restricted_to = ( + hs.config.worker.outbound_federation_restricted_to ) + if hs.get_instance_name() in outbound_federation_restricted_to: + # Talk to federation directly + federation_agent: IAgent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, + ) + else: + # We need to talk to federation via the proxy via one of the configured + # locations + federation_proxies = outbound_federation_restricted_to.locations + federation_agent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxies=federation_proxies, + ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent = BlocklistingAgentWrapper( + self.agent: IAgent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) @@ -412,7 +428,6 @@ def __init__( self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 - self.max_long_retry_delay_seconds = ( hs.config.federation.max_long_retry_delay_ms / 1000 ) @@ -1131,6 +1146,101 @@ async def get_json( Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + """ + json_dict, _ = await self.get_json_with_headers( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, + ignore_backoff=ignore_backoff, + try_trailing_slash_on_400=try_trailing_slash_on_400, + parser=parser, + ) + return json_dict + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, + ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: + ... + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = ..., + retry_on_dns_fail: bool = ..., + timeout: Optional[int] = ..., + ignore_backoff: bool = ..., + try_trailing_slash_on_400: bool = ..., + parser: ByteParser[T] = ..., + ) -> Tuple[T, Dict[bytes, List[bytes]]]: + ... + + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: + """GETs some json from the given host homeserver and path + + Args: + destination: The remote server to send the HTTP request to. + + path: The HTTP path. + + args: A dictionary used to create query strings, defaults to + None. + + retry_on_dns_fail: true if the request should be retried on DNS failures + + timeout: number of milliseconds to wait for the response. + self._default_timeout (60s) by default. + + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED + response we should try appending a trailing slash to the end of + the request. Workaround for #3622 in Synapse <= v0.99.3. + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. + + Returns: + Succeeds when we get a 2xx HTTP response. The result will be a tuple of the + decoded JSON body and a dict of the response headers. + Raises: HttpResponseException: If we get an HTTP response code >= 300 (except 429). @@ -1156,6 +1266,8 @@ async def get_json( timeout=timeout, ) + headers = dict(response.headers.getAllRawHeaders()) + if timeout is not None: _sec_timeout = timeout / 1000 else: @@ -1173,7 +1285,7 @@ async def get_json( parser=parser, ) - return body + return body, headers async def delete_json( self, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py new file mode 100644 index 000000000000..0874d67760ae --- /dev/null +++ b/synapse/http/proxy.py @@ -0,0 +1,249 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import logging +import urllib.parse +from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast + +from twisted.internet import protocol +from twisted.internet.interfaces import ITCPTransport +from twisted.internet.protocol import connectionDone +from twisted.python import failure +from twisted.python.failure import Failure +from twisted.web.client import ResponseDone +from twisted.web.http_headers import Headers +from twisted.web.iweb import IAgent, IResponse +from twisted.web.resource import IResource +from twisted.web.server import Site + +from synapse.api.errors import Codes +from synapse.http import QuieterFileBodyProducer +from synapse.http.server import _AsyncResource +from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.types import ISynapseReactor +from synapse.util.async_helpers import timeout_deferred + +if TYPE_CHECKING: + from synapse.http.site import SynapseRequest + +logger = logging.getLogger(__name__) + +# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616 +# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be +# consumed by the immediate recipient and not be forwarded on. +HOP_BY_HOP_HEADERS = { + "Connection", + "Keep-Alive", + "Proxy-Authenticate", + "Proxy-Authorization", + "TE", + "Trailers", + "Transfer-Encoding", + "Upgrade", +} + + +def parse_connection_header_value( + connection_header_value: Optional[bytes], +) -> Set[str]: + """ + Parse the `Connection` header to determine which headers we should not be copied + over from the remote response. + + As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 + + Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}` + + Even though "close" is a special directive, let's just treat it as just another + header for simplicity. If people want to check for this directive, they can simply + check for `"Close" in headers`. + + Args: + connection_header_value: The value of the `Connection` header. + + Returns: + The set of header names that should not be copied over from the remote response. + The keys are capitalized in canonical capitalization. + """ + headers = Headers() + extra_headers_to_remove: Set[str] = set() + if connection_header_value: + extra_headers_to_remove = { + headers._canonicalNameCaps(connection_option.strip()).decode("ascii") + for connection_option in connection_header_value.split(b",") + } + + return extra_headers_to_remove + + +class ProxyResource(_AsyncResource): + """ + A stub resource that proxies any requests with a `matrix-federation://` scheme + through the given `federation_agent` to the remote homeserver and ferries back the + info. + """ + + isLeaf = True + + def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): + super().__init__(True) + + self.reactor = reactor + self.agent = federation_agent + + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: + uri = urllib.parse.urlparse(request.uri) + assert uri.scheme == b"matrix-federation" + + headers = Headers() + for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): + header_value = request.getHeader(header_name) + if header_value: + headers.addRawHeader(header_name, header_value) + + request_deferred = run_in_background( + self.agent.request, + request.method, + request.uri, + headers=headers, + bodyProducer=QuieterFileBodyProducer(request.content), + ) + request_deferred = timeout_deferred( + request_deferred, + # This should be set longer than the timeout in `MatrixFederationHttpClient` + # so that it has enough time to complete and pass us the data before we give + # up. + timeout=90, + reactor=self.reactor, + ) + + response = await make_deferred_yieldable(request_deferred) + + return response.code, response + + def _send_response( + self, + request: "SynapseRequest", + code: int, + response_object: Any, + ) -> None: + response = cast(IResponse, response_object) + response_headers = cast(Headers, response.headers) + + request.setResponseCode(code) + + # The `Connection` header also defines which headers should not be copied over. + connection_header = response_headers.getRawHeaders(b"connection") + extra_headers_to_remove = parse_connection_header_value( + connection_header[0] if connection_header else None + ) + + # Copy headers. + for k, v in response_headers.getAllRawHeaders(): + # Do not copy over any hop-by-hop headers. These are meant to only be + # consumed by the immediate recipient and not be forwarded on. + header_key = k.decode("ascii") + if ( + header_key in HOP_BY_HOP_HEADERS + or header_key in extra_headers_to_remove + ): + continue + + request.responseHeaders.setRawHeaders(k, v) + + response.deliverBody(_ProxyResponseBody(request)) + + def _send_error_response( + self, + f: failure.Failure, + request: "SynapseRequest", + ) -> None: + request.setResponseCode(502) + request.setHeader(b"Content-Type", b"application/json") + request.write( + ( + json.dumps( + { + "errcode": Codes.UNKNOWN, + "err": "ProxyResource: Error when proxying request: %s %s -> %s" + % ( + request.method.decode("ascii"), + request.uri.decode("ascii"), + f, + ), + } + ) + ).encode() + ) + request.finish() + + +class _ProxyResponseBody(protocol.Protocol): + """ + A protocol that proxies the given remote response data back out to the given local + request. + """ + + transport: Optional[ITCPTransport] = None + + def __init__(self, request: "SynapseRequest") -> None: + self._request = request + + def dataReceived(self, data: bytes) -> None: + # Avoid sending response data to the local request that already disconnected + if self._request._disconnected and self.transport is not None: + # Close the connection (forcefully) since all the data will get + # discarded anyway. + self.transport.abortConnection() + return + + self._request.write(data) + + def connectionLost(self, reason: Failure = connectionDone) -> None: + # If the local request is already finished (successfully or failed), don't + # worry about sending anything back. + if self._request.finished: + return + + if reason.check(ResponseDone): + self._request.finish() + else: + # Abort the underlying request since our remote request also failed. + self._request.transport.abortConnection() + + +class ProxySite(Site): + """ + Proxies any requests with a `matrix-federation://` scheme through the given + `federation_agent`. Otherwise, behaves like a normal `Site`. + """ + + def __init__( + self, + resource: IResource, + reactor: ISynapseReactor, + federation_agent: IAgent, + ): + super().__init__(resource, reactor=reactor) + + self._proxy_resource = ProxyResource(reactor, federation_agent) + + def getResourceFor(self, request: "SynapseRequest") -> IResource: + uri = urllib.parse.urlparse(request.uri) + if uri.scheme == b"matrix-federation": + return self._proxy_resource + + return super().getResourceFor(request) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 7bdc4acae7da..1fa3adbef20c 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import re -from typing import Any, Dict, Optional, Tuple +from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlparse from urllib.request import ( # type: ignore[attr-defined] getproxies_environment, @@ -24,7 +25,12 @@ from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint +from twisted.internet.interfaces import ( + IProtocol, + IProtocolFactory, + IReactorCore, + IStreamClientEndpoint, +) from twisted.python.failure import Failure from twisted.web.client import ( URI, @@ -36,8 +42,10 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse +from synapse.config.workers import InstanceLocationConfig from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials +from synapse.logging.context import run_in_background logger = logging.getLogger(__name__) @@ -74,6 +82,10 @@ class ProxyAgent(_AgentBase): use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. + federation_proxies: An optional list of locations to proxy outbound federation + traffic through (only requests that use the `matrix-federation://` scheme + will be proxied). + Raises: ValueError if use_proxy is set and the environment variables contain an invalid proxy specification. @@ -89,6 +101,7 @@ def __init__( bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, + federation_proxies: Collection[InstanceLocationConfig] = (), ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -127,6 +140,27 @@ def __init__( self._policy_for_https = contextFactory self._reactor = reactor + self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None + if federation_proxies: + endpoints = [] + for federation_proxy in federation_proxies: + endpoint = HostnameEndpoint( + self.proxy_reactor, + federation_proxy.host, + federation_proxy.port, + ) + + if federation_proxy.tls: + tls_connection_creator = self._policy_for_https.creatorForNetloc( + federation_proxy.host, + federation_proxy.port, + ) + endpoint = wrapClientTLS(tls_connection_creator, endpoint) + + endpoints.append(endpoint) + + self._federation_proxy_endpoint = _ProxyEndpoints(endpoints) + def request( self, method: bytes, @@ -214,6 +248,14 @@ def request( parsed_uri.port, self.https_proxy_creds, ) + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): + # Cache *all* connections under the same key, since we are only + # connecting to a single destination, the proxy: + endpoint = self._federation_proxy_endpoint + request_path = uri else: # not using a proxy endpoint = HostnameEndpoint( @@ -233,6 +275,11 @@ def request( endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): + pass else: return defer.fail( Failure( @@ -337,3 +384,31 @@ def parse_proxy( credentials = ProxyCredentials(b"".join([url.username, b":", url.password])) return url.scheme, url.hostname, url.port or default_port, credentials + + +@implementer(IStreamClientEndpoint) +class _ProxyEndpoints: + """An endpoint that randomly iterates through a given list of endpoints at + each connection attempt. + """ + + def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None: + assert endpoints + self._endpoints = endpoints + + def connect( + self, protocol_factory: IProtocolFactory + ) -> "defer.Deferred[IProtocol]": + """Implements IStreamClientEndpoint interface""" + + return run_in_background(self._do_connect, protocol_factory) + + async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol: + failures: List[Failure] = [] + for endpoint in random.sample(self._endpoints, k=len(self._endpoints)): + try: + return await endpoint.connect(protocol_factory) + except Exception: + failures.append(Failure()) + + failures.pop().raiseException() diff --git a/synapse/http/server.py b/synapse/http/server.py index 933172c87327..ff3153a9d949 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ import logging import types import urllib +import urllib.parse from http import HTTPStatus from http.client import FOUND from inspect import isawaitable @@ -65,7 +66,6 @@ UnrecognizedRequestError, ) from synapse.config.homeserver import HomeServerConfig -from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import active_span, start_active_span, trace_servlet from synapse.util import json_encoder @@ -76,6 +76,7 @@ if TYPE_CHECKING: import opentracing + from synapse.http.site import SynapseRequest from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ def return_json_error( - f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig] + f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig] ) -> None: """Sends a JSON error response to clients.""" @@ -220,8 +221,8 @@ def return_html_error( def wrap_async_request_handler( - h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]] -) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]: + h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]] +) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]: """Wraps an async request handler so that it calls request.processing. This helps ensure that work done by the request handler after the request is completed @@ -235,7 +236,7 @@ def wrap_async_request_handler( """ async def wrapped_async_request_handler( - self: "_AsyncResource", request: SynapseRequest + self: "_AsyncResource", request: "SynapseRequest" ) -> None: with request.processing(): await h(self, request) @@ -300,7 +301,7 @@ def __init__(self, extract_context: bool = False): self._extract_context = extract_context - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: """This gets called by twisted every time someone sends us a request.""" request.render_deferred = defer.ensureDeferred( self._async_render_wrapper(request) @@ -308,7 +309,7 @@ def render(self, request: SynapseRequest) -> int: return NOT_DONE_YET @wrap_async_request_handler - async def _async_render_wrapper(self, request: SynapseRequest) -> None: + async def _async_render_wrapper(self, request: "SynapseRequest") -> None: """This is a wrapper that delegates to `_async_render` and handles exceptions, return values, metrics, etc. """ @@ -326,9 +327,15 @@ async def _async_render_wrapper(self, request: SynapseRequest) -> None: # of our stack, and thus gives us a sensible stack # trace. f = failure.Failure() + logger.exception( + "Error handling request", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) self._send_error_response(f, request) - async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]: + async def _async_render( + self, request: "SynapseRequest" + ) -> Optional[Tuple[int, Any]]: """Delegates to `_async_render_` methods, or returns a 400 if no appropriate method exists. Can be overridden in sub classes for different routing. @@ -358,7 +365,7 @@ async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, An @abc.abstractmethod def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -368,7 +375,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: raise NotImplementedError() @@ -384,7 +391,7 @@ def __init__(self, canonical_json: bool = False, extract_context: bool = False): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -401,7 +408,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, None) @@ -473,7 +480,7 @@ def register_paths( ) def _get_handler_for_request( - self, request: SynapseRequest + self, request: "SynapseRequest" ) -> Tuple[ServletCallback, str, Dict[str, str]]: """Finds a callback method to handle the given request. @@ -503,7 +510,7 @@ def _get_handler_for_request( # Huh. No one wanted to handle that? Fiiiiiine. raise UnrecognizedRequestError(code=404) - async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: callback, servlet_classname, group_dict = self._get_handler_for_request(request) request.is_render_cancellable = is_function_cancellable(callback) @@ -535,7 +542,7 @@ async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, self.hs.config) @@ -551,7 +558,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -565,7 +572,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_html_error(f, request, self.ERROR_TEMPLATE) @@ -592,7 +599,7 @@ class UnrecognizedRequestResource(resource.Resource): errcode of M_UNRECOGNIZED. """ - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: f = failure.Failure(UnrecognizedRequestError(code=404)) return_json_error(f, request, None) # A response has already been sent but Twisted requires either NOT_DONE_YET @@ -622,7 +629,7 @@ def getChild(self, name: str, request: Request) -> resource.Resource: class OptionsResource(resource.Resource): """Responds to OPTION requests for itself and all children.""" - def render_OPTIONS(self, request: SynapseRequest) -> bytes: + def render_OPTIONS(self, request: "SynapseRequest") -> bytes: request.setResponseCode(204) request.setHeader(b"Content-Length", b"0") @@ -737,7 +744,7 @@ def _encode_json_bytes(json_object: object) -> bytes: def respond_with_json( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_object: Any, send_cors: bool = False, @@ -787,7 +794,7 @@ def respond_with_json( def respond_with_json_bytes( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_bytes: bytes, send_cors: bool = False, @@ -825,7 +832,7 @@ def respond_with_json_bytes( async def _async_write_json_to_request_in_thread( - request: SynapseRequest, + request: "SynapseRequest", json_encoder: Callable[[Any], bytes], json_object: Any, ) -> None: @@ -883,7 +890,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: _ByteProducer(request, bytes_generator) -def set_cors_headers(request: SynapseRequest) -> None: +def set_cors_headers(request: "SynapseRequest") -> None: """Set the CORS headers so that javascript running in a web browsers can use this API @@ -981,7 +988,7 @@ def set_clickjacking_protection_headers(request: Request) -> None: def respond_with_redirect( - request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False + request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False ) -> None: """ Write a 302 (or other specified status code) response to the request, if it is still alive. diff --git a/synapse/http/site.py b/synapse/http/site.py index 5b5a7c1e598f..0ee259834559 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -21,25 +21,28 @@ from twisted.internet.address import UNIXAddress from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IAddress, IReactorTime +from twisted.internet.interfaces import IAddress from twisted.python.failure import Failure from twisted.web.http import HTTPChannel +from twisted.web.iweb import IAgent from twisted.web.resource import IResource, Resource -from twisted.web.server import Request, Site +from twisted.web.server import Request from synapse.config.server import ListenerConfig from synapse.http import get_request_user_agent, redact_uri +from synapse.http.proxy import ProxySite from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import ( ContextRequest, LoggingContext, PreserveLoggingContext, ) -from synapse.types import Requester +from synapse.types import ISynapseReactor, Requester if TYPE_CHECKING: import opentracing + logger = logging.getLogger(__name__) _next_request_seq = 0 @@ -102,7 +105,7 @@ def __init__( # A boolean indicating whether `render_deferred` should be cancelled if the # client disconnects early. Expected to be set by the coroutine started by # `Resource.render`, if rendering is asynchronous. - self.is_render_cancellable = False + self.is_render_cancellable: bool = False global _next_request_seq self.request_seq = _next_request_seq @@ -601,7 +604,7 @@ class _XForwardedForAddress: host: str -class SynapseSite(Site): +class SynapseSite(ProxySite): """ Synapse-specific twisted http Site @@ -623,7 +626,8 @@ def __init__( resource: IResource, server_version_string: str, max_request_body_size: int, - reactor: IReactorTime, + reactor: ISynapseReactor, + federation_agent: IAgent, ): """ @@ -638,7 +642,11 @@ def __init__( dropping the connection reactor: reactor to be used to manage connection timeouts """ - Site.__init__(self, resource, reactor=reactor) + super().__init__( + resource=resource, + reactor=reactor, + federation_agent=federation_agent, + ) self.site_tag = site_tag self.reactor = reactor @@ -649,7 +657,9 @@ def __init__( request_id_header = config.http_options.request_id_header - self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886 + self.experimental_cors_msc3886: bool = ( + config.http_options.experimental_cors_msc3886 + ) def request_factory(channel: HTTPChannel, queued: bool) -> Request: return request_class( diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py index 1c0fde4966e7..114130a08fe2 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py @@ -21,16 +21,13 @@ """ import logging +from importlib import metadata from typing import Iterable, NamedTuple, Optional from packaging.requirements import Requirement DISTRIBUTION_NAME = "matrix-synapse" -try: - from importlib import metadata -except ImportError: - import importlib_metadata as metadata # type: ignore[no-redef] __all__ = ["check_requirements"] diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py index 5a965f233b05..21c530974083 100644 --- a/tests/app/test_openid_listener.py +++ b/tests/app/test_openid_listener.py @@ -31,9 +31,7 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer) return hs def default_config(self) -> JsonDict: @@ -91,9 +89,7 @@ def test_openid_listener(self, names: List[str], expectation: str) -> None: @patch("synapse.app.homeserver.KeyResource", new=Mock()) class SynapseHomeserverOpenIDListenerTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=SynapseHomeServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer) return hs @parameterized.expand( diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index ee48f9e546e8..66215af2b890 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -41,7 +41,6 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.appservice_api = mock.Mock() hs = self.setup_test_homeserver( "server", - federation_http_client=None, application_service_api=self.appservice_api, ) handler = hs.get_device_handler() @@ -401,7 +400,7 @@ def test_on_federation_query_user_devices_appservice(self) -> None: class DehydrationTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") handler = hs.get_device_handler() assert isinstance(handler, DeviceHandler) self.handler = handler diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index bf0862ed541c..5f11d5df11ad 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -57,7 +57,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver(federation_http_client=None) + hs = self.setup_test_homeserver() self.handler = hs.get_federation_handler() self.store = hs.get_datastores().main return hs diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 19f5322317a1..fd66d573d221 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -993,7 +993,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver( "server", - federation_http_client=None, federation_sender=Mock(spec=FederationSender), ) return hs diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 94518a7196b4..5da1d95f0b22 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -17,6 +17,8 @@ from typing import Dict, List, Set from unittest.mock import ANY, Mock, call +from netaddr import IPSet + from twisted.test.proto_helpers import MemoryReactor from twisted.web.resource import Resource @@ -24,6 +26,7 @@ from synapse.api.errors import AuthError from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.typing import TypingWriterHandler +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.server import HomeServer from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util import Clock @@ -76,6 +79,13 @@ def make_homeserver( # we mock out the federation client too self.mock_federation_client = Mock(spec=["put_json"]) self.mock_federation_client.put_json.return_value = make_awaitable((200, "OK")) + self.mock_federation_client.agent = MatrixFederationAgent( + reactor, + tls_client_options_factory=None, + user_agent=b"SynapseInTrialTest/0.0.0", + ip_allowlist=None, + ip_blocklist=IPSet(), + ) # the tests assume that we are starting at unix time 1000 reactor.pump((1000,)) diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index b5f4a60fe500..a8b9737d1fb7 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generator -from unittest.mock import Mock +from typing import Any, Dict, Generator +from unittest.mock import ANY, Mock, create_autospec from netaddr import IPSet from parameterized import parameterized @@ -21,10 +21,11 @@ from twisted.internet.defer import Deferred, TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.test.proto_helpers import MemoryReactor, StringTransport -from twisted.web.client import ResponseNeverReceived +from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel +from twisted.web.http_headers import Headers -from synapse.api.errors import RequestSendFailed +from synapse.api.errors import HttpResponseException, RequestSendFailed from synapse.http.matrixfederationclient import ( ByteParser, MatrixFederationHttpClient, @@ -39,7 +40,9 @@ from synapse.server import HomeServer from synapse.util import Clock +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.server import FakeTransport +from tests.test_utils import FakeResponse from tests.unittest import HomeserverTestCase, override_config @@ -658,3 +661,181 @@ def test_configurable_retry_and_delay_values(self) -> None: self.assertEqual(self.cl.max_short_retry_delay_seconds, 7) self.assertEqual(self.cl.max_long_retries, 20) self.assertEqual(self.cl.max_short_retries, 5) + + +class FederationClientProxyTests(BaseMultiWorkerStreamTestCase): + def default_config(self) -> Dict[str, Any]: + conf = super().default_config() + conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "federation_sender": {"host": "testserv", "port": 1001}, + } + return conf + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_requests_through_federation_sender_worker(self) -> None: + """ + Test that all outbound federation requests go through the `federation_sender` + worker + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.succeed( + FakeResponse.json( + payload={ + "foo": "bar", + } + ) + ) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + # Pump the reactor so our deferred goes through the motions + self.pump() + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + # Make sure the response is as expected back on the main worker + res = self.successResultOf(test_request_from_main_process_d) + self.assertEqual(res, {"foo": "bar"}) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_request_with_network_error_through_federation_sender_worker( + self, + ) -> None: + """ + Test that when the outbound federation request fails with a network related + error, a sensible error makes its way back to the main process. + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error")) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + # Pump the reactor so our deferred goes through the motions. We pump with 10 + # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries + # and finally passes along the error response. + self.pump(0.1) + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + # Make sure we get some sort of error back on the main worker + failure_res = self.failureResultOf(test_request_from_main_process_d) + self.assertIsInstance(failure_res.value, RequestSendFailed) + self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None: + """ + Test to make sure hop-by-hop headers and addional headers defined in the + `Connection` header are discarded when proxying requests + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed( + FakeResponse( + code=200, + body=b'{"foo": "bar"}', + headers=Headers( + { + "Content-Type": ["application/json"], + "Connection": ["close, X-Foo, X-Bar"], + # Should be removed because it's defined in the `Connection` header + "X-Foo": ["foo"], + "X-Bar": ["bar"], + # Should be removed because it's a hop-by-hop header + "Proxy-Authorization": "abcdef", + } + ), + ) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json_with_headers( + "remoteserv:8008", "foo/bar" + ) + ) + + # Pump the reactor so our deferred goes through the motions + self.pump() + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + res, headers = self.successResultOf(test_request_from_main_process_d) + header_names = set(headers.keys()) + + # Make sure the response does not include the hop-by-hop headers + self.assertNotIn(b"X-Foo", header_names) + self.assertNotIn(b"X-Bar", header_names) + self.assertNotIn(b"Proxy-Authorization", header_names) + # Make sure the response is as expected back on the main worker + self.assertEqual(res, {"foo": "bar"}) diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py new file mode 100644 index 000000000000..0dc9ba8e0582 --- /dev/null +++ b/tests/http/test_proxy.py @@ -0,0 +1,53 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Set + +from parameterized import parameterized + +from synapse.http.proxy import parse_connection_header_value + +from tests.unittest import TestCase + + +class ProxyTests(TestCase): + @parameterized.expand( + [ + [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], + # No whitespace + [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}], + # More whitespace + [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], + # "close" directive in not the first position + [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}], + # Normalizes header capitalization + [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}], + # Handles header names with whitespace + [ + b"keep-alive, x foo, x bar", + {"Keep-Alive", "X foo", "X bar"}, + ], + ] + ) + def test_parse_connection_header_value( + self, + connection_header_value: bytes, + expected_extra_headers_to_remove: Set[str], + ) -> None: + """ + Tests that the connection header value is parsed correctly + """ + self.assertEqual( + expected_extra_headers_to_remove, + parse_connection_header_value(connection_header_value), + ) diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 7c3656d049f1..d14876826ce9 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -12,19 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from importlib import metadata from typing import Dict, Tuple - -from typing_extensions import Protocol - -try: - from importlib import metadata -except ImportError: - import importlib_metadata as metadata # type: ignore[no-redef] - from unittest.mock import patch from pkg_resources import parse_version from prometheus_client.core import Sample +from typing_extensions import Protocol from synapse.app._base import _set_prometheus_client_use_created_metrics from synapse.metrics import REGISTRY, InFlightGauge, generate_latest diff --git a/tests/replication/_base.py b/tests/replication/_base.py index eb9b1f1cd9be..96badc46b04d 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -69,10 +69,10 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: # Make a new HomeServer object for the worker self.reactor.lookups["testserv"] = "1.2.3.4" self.worker_hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer, config=self._get_worker_hs_config(), reactor=self.reactor, + federation_http_client=None, ) # Since we use sqlite in memory databases we need to make sure the @@ -380,6 +380,7 @@ def make_worker_hs( server_version_string="1", max_request_body_size=8192, reactor=self.reactor, + federation_agent=worker_hs.get_federation_http_client().agent, ) worker_hs.get_replication_command_handler().start_replication(worker_hs) diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 08703206a9c9..a324b4d31dde 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -14,14 +14,18 @@ import logging from unittest.mock import Mock +from netaddr import IPSet + from synapse.api.constants import EventTypes, Membership from synapse.events.builder import EventBuilderFactory from synapse.handlers.typing import TypingWriterHandler +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.rest.admin import register_servlets_for_client_rest_resource from synapse.rest.client import login, room from synapse.types import UserID, create_requester from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.server import get_clock from tests.test_utils import make_awaitable logger = logging.getLogger(__name__) @@ -41,13 +45,25 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): room.register_servlets, ] + def setUp(self) -> None: + super().setUp() + + reactor, _ = get_clock() + self.matrix_federation_agent = MatrixFederationAgent( + reactor, + tls_client_options_factory=None, + user_agent=b"SynapseInTrialTest/0.0.0", + ip_allowlist=None, + ip_blocklist=IPSet(), + ) + def test_send_event_single_sender(self) -> None: """Test that using a single federation sender worker correctly sends a new event. """ mock_client = Mock(spec=["put_json"]) mock_client.put_json.return_value = make_awaitable({}) - + mock_client.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -78,6 +94,7 @@ def test_send_event_sharded(self) -> None: """ mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) + mock_client1.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -92,6 +109,7 @@ def test_send_event_sharded(self) -> None: mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) + mock_client2.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -145,6 +163,7 @@ def test_send_typing_sharded(self) -> None: """ mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) + mock_client1.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -159,6 +178,7 @@ def test_send_typing_sharded(self) -> None: mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) + mock_client2.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { diff --git a/tests/rest/client/test_presence.py b/tests/rest/client/test_presence.py index dcbb125a3bba..e12098102b96 100644 --- a/tests/rest/client/test_presence.py +++ b/tests/rest/client/test_presence.py @@ -40,7 +40,6 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver( "red", - federation_http_client=None, federation_client=Mock(), presence_handler=self.presence_handler, ) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index f1b4e1ad2fc1..d013e75d55d7 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -67,8 +67,6 @@ class RoomBase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.hs = self.setup_test_homeserver( "red", - federation_http_client=None, - federation_client=Mock(), ) self.hs.get_federation_handler = Mock() # type: ignore[assignment] diff --git a/tests/storage/test_e2e_room_keys.py b/tests/storage/test_e2e_room_keys.py index 9cb326d90a87..f6df31aba4ca 100644 --- a/tests/storage/test_e2e_room_keys.py +++ b/tests/storage/test_e2e_room_keys.py @@ -31,7 +31,7 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") self.store = hs.get_datastores().main return hs diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 857e2caf2e24..02826731679e 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -27,7 +27,7 @@ class PurgeTests(HomeserverTestCase): servlets = [room.register_servlets] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py index 6861d3a6c9e5..809c9f175d42 100644 --- a/tests/storage/test_rollback_worker.py +++ b/tests/storage/test_rollback_worker.py @@ -45,9 +45,7 @@ def fake_listdir(filepath: str) -> List[str]: class WorkerSchemaTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer) return hs def default_config(self) -> JsonDict: diff --git a/tests/test_server.py b/tests/test_server.py index e266c06a2cbf..fe5afebdcda4 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -38,7 +38,7 @@ from tests.server import ( FakeChannel, FakeSite, - ThreadedMemoryReactorClock, + get_clock, make_request, setup_test_homeserver, ) @@ -46,12 +46,11 @@ class JsonResourceTests(unittest.TestCase): def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.hs_clock = Clock(self.reactor) + reactor, clock = get_clock() + self.reactor = reactor self.homeserver = setup_test_homeserver( self.addCleanup, - federation_http_client=None, - clock=self.hs_clock, + clock=clock, reactor=self.reactor, ) @@ -209,7 +208,13 @@ def _callback( class OptionsResourceTests(unittest.TestCase): def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() + reactor, clock = get_clock() + self.reactor = reactor + self.homeserver = setup_test_homeserver( + self.addCleanup, + clock=clock, + reactor=self.reactor, + ) class DummyResource(Resource): isLeaf = True @@ -242,6 +247,7 @@ def _make_request( "1.0", max_request_body_size=4096, reactor=self.reactor, + federation_agent=self.homeserver.get_federation_http_client().agent, ) # render the request and return the channel @@ -344,7 +350,8 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: await self.callback(request) def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() + reactor, _ = get_clock() + self.reactor = reactor def test_good_response(self) -> None: async def callback(request: SynapseRequest) -> None: @@ -462,9 +469,9 @@ class DirectServeJsonResourceCancellationTests(unittest.TestCase): """Tests for `DirectServeJsonResource` cancellation.""" def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.clock = Clock(self.reactor) - self.resource = CancellableDirectServeJsonResource(self.clock) + reactor, clock = get_clock() + self.reactor = reactor + self.resource = CancellableDirectServeJsonResource(clock) self.site = FakeSite(self.resource, self.reactor) def test_cancellable_disconnect(self) -> None: @@ -496,9 +503,9 @@ class DirectServeHtmlResourceCancellationTests(unittest.TestCase): """Tests for `DirectServeHtmlResource` cancellation.""" def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.clock = Clock(self.reactor) - self.resource = CancellableDirectServeHtmlResource(self.clock) + reactor, clock = get_clock() + self.reactor = reactor + self.resource = CancellableDirectServeHtmlResource(clock) self.site = FakeSite(self.resource, self.reactor) def test_cancellable_disconnect(self) -> None: diff --git a/tests/unittest.py b/tests/unittest.py index c73195b32b00..334a95a9175d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -358,6 +358,7 @@ def setUp(self) -> None: server_version_string="1", max_request_body_size=4096, reactor=self.reactor, + federation_agent=self.hs.get_federation_http_client().agent, ) from tests.rest.client.utils import RestHelper