diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 45dfc3fa0..48bd85ac1 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -18,7 +18,7 @@ services: - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret kdc: hostname: kdc.emqx.net - image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 + image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04 container_name: kdc.emqx.net expose: - 88 # kdc diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index ce98a7ced..212ff78ed 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -3,7 +3,7 @@ version: '3.9' services: erlang: container_name: erlang - image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04} + image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04} env_file: - credentials.env - conf.env diff --git a/.github/workflows/_pr_entrypoint.yaml b/.github/workflows/_pr_entrypoint.yaml index 3b8b1a0cd..9f480d220 100644 --- a/.github/workflows/_pr_entrypoint.yaml +++ b/.github/workflows/_pr_entrypoint.yaml @@ -17,16 +17,16 @@ env: jobs: sanity-checks: runs-on: ubuntu-22.04 - container: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" + container: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" outputs: ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-host: ${{ steps.matrix.outputs.ct-host }} ct-docker: ${{ steps.matrix.outputs.ct-docker }} version-emqx: ${{ steps.matrix.outputs.version-emqx }} version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }} - builder: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" - builder_vsn: "5.3-5" - otp_vsn: "26.2.1-2" + builder: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" + builder_vsn: "5.3-7" + otp_vsn: "26.2.5-1" elixir_vsn: "1.15.7" permissions: @@ -96,13 +96,13 @@ jobs: MATRIX="$(echo "${APPS}" | jq -c ' [ (.[] | select(.profile == "emqx") | . + { - builder: "5.3-5", - otp: "26.2.1-2", + builder: "5.3-7", + otp: "26.2.5-1", elixir: "1.15.7" }), (.[] | select(.profile == "emqx-enterprise") | . + { - builder: "5.3-5", - otp: ["26.2.1-2"][], + builder: "5.3-7", + otp: ["26.2.5-1"][], elixir: "1.15.7" }) ] diff --git a/.github/workflows/_push-entrypoint.yaml b/.github/workflows/_push-entrypoint.yaml index 7033ab989..9c79eb42e 100644 --- a/.github/workflows/_push-entrypoint.yaml +++ b/.github/workflows/_push-entrypoint.yaml @@ -24,7 +24,7 @@ env: jobs: prepare: runs-on: ubuntu-22.04 - container: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' + container: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' outputs: profile: ${{ steps.parse-git-ref.outputs.profile }} release: ${{ steps.parse-git-ref.outputs.release }} @@ -32,9 +32,9 @@ jobs: ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-host: ${{ steps.matrix.outputs.ct-host }} ct-docker: ${{ steps.matrix.outputs.ct-docker }} - builder: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' - builder_vsn: '5.3-5' - otp_vsn: '26.2.1-2' + builder: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' + builder_vsn: '5.3-7' + otp_vsn: '26.2.5-1' elixir_vsn: '1.15.7' permissions: @@ -66,13 +66,13 @@ jobs: MATRIX="$(echo "${APPS}" | jq -c ' [ (.[] | select(.profile == "emqx") | . + { - builder: "5.3-5", - otp: "26.2.1-2", + builder: "5.3-7", + otp: "26.2.5-1", elixir: "1.15.7" }), (.[] | select(.profile == "emqx-enterprise") | . + { - builder: "5.3-5", - otp: ["26.2.1-2"][], + builder: "5.3-7", + otp: ["26.2.5-1"][], elixir: "1.15.7" }) ] @@ -107,8 +107,7 @@ jobs: profile: ${{ needs.prepare.outputs.profile }} publish: true latest: ${{ needs.prepare.outputs.latest }} - # TODO: revert this back to needs.prepare.outputs.otp_vsn when OTP 26 bug is fixed - otp_vsn: 25.3.2-2 + otp_vsn: ${{ needs.prepare.outputs.otp_vsn }} elixir_vsn: ${{ needs.prepare.outputs.elixir_vsn }} builder_vsn: ${{ needs.prepare.outputs.builder_vsn }} secrets: inherit diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index 5c879dc11..6c49236d4 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -53,7 +53,7 @@ on: otp_vsn: required: false type: string - default: '25.3.2-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -61,7 +61,7 @@ on: builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' permissions: contents: read @@ -169,8 +169,8 @@ jobs: EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} - EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} - EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} + OTP_VSN: ${{ inputs.otp_vsn }} + ELIXIR_VSN: ${{ inputs.elixir_vsn }} EMQX_SOURCE_TYPE: tgz run: | ./build ${PROFILE} docker @@ -218,8 +218,8 @@ jobs: EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} - EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} - EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} + OTP_VSN: ${{ inputs.otp_vsn }} + ELIXIR_VSN: ${{ inputs.elixir_vsn }} EMQX_SOURCE_TYPE: tgz run: | ./build ${PROFILE} docker diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index 0cb35986c..87b35768a 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -55,7 +55,7 @@ on: otp_vsn: required: false type: string - default: '26.2.1-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -63,7 +63,7 @@ on: builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' permissions: contents: read diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 1d4343e89..61a95f938 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -23,8 +23,8 @@ jobs: fail-fast: false matrix: profile: - - ['emqx', 'master', '5.3-5:1.15.7-26.2.1-2'] - - ['emqx', 'release-57', '5.3-5:1.15.7-26.2.1-2'] + - ['emqx', 'master', '5.3-7:1.15.7-26.2.5-1'] + - ['emqx', 'release-57', '5.3-7:1.15.7-26.2.5-1'] os: - ubuntu22.04 - amzn2023 @@ -92,7 +92,7 @@ jobs: branch: - master otp: - - 26.2.1-2 + - 26.2.5-1 os: - macos-12-arm64 diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 7c8e3aaa4..cb7f53358 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -27,15 +27,15 @@ on: builder: required: false type: string - default: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' + default: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04' builder_vsn: required: false type: string - default: '5.3-5' + default: '5.3-7' otp_vsn: required: false type: string - default: '26.2.1-2' + default: '26.2.5-1' elixir_vsn: required: false type: string @@ -54,9 +54,9 @@ jobs: fail-fast: false matrix: profile: - - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "x64"] - - ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "arm64"] - - ["emqx-enterprise", "26.2.1-2", "ubuntu22.04", "erlang", "x64"] + - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "x64"] + - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "arm64"] + - ["emqx-enterprise", "26.2.5-1", "ubuntu22.04", "erlang", "x64"] container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}" diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index 5bb2d29f0..774f0e344 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -18,7 +18,7 @@ jobs: actions: read security-events: write container: - image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 + image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04 strategy: fail-fast: false diff --git a/.github/workflows/performance_test.yaml b/.github/workflows/performance_test.yaml index 413ac3728..54645ceb7 100644 --- a/.github/workflows/performance_test.yaml +++ b/.github/workflows/performance_test.yaml @@ -26,7 +26,7 @@ jobs: prepare: runs-on: ubuntu-latest if: github.repository_owner == 'emqx' - container: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu20.04 outputs: BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }} PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }} diff --git a/.github/workflows/run_relup_tests.yaml b/.github/workflows/run_relup_tests.yaml index e0ead42a3..b4214771d 100644 --- a/.github/workflows/run_relup_tests.yaml +++ b/.github/workflows/run_relup_tests.yaml @@ -74,7 +74,7 @@ jobs: steps: - uses: erlef/setup-beam@2f0cc07b4b9bea248ae098aba9e1a8a1de5ec24c # v1.17.5 with: - otp-version: 26.2.1 + otp-version: 26.2.5 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: repository: hawk/lux diff --git a/.tool-versions b/.tool-versions index b29f3ab4f..b9c0e8deb 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -erlang 26.2.1-2 +erlang 26.2.5-1 elixir 1.15.7-otp-26 diff --git a/Makefile b/Makefile index 1659f2de1..8c490c40d 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export EMQX_RELUP ?= true -export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 @@ -20,8 +20,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.9.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0 +export EMQX_DASHBOARD_VERSION ?= v1.9.1-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1-beta.1 -include default-profile.mk PROFILE ?= emqx diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index bf9377e7e..14113d022 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -37,6 +37,10 @@ -compile(nowarn_export_all). -endif. +-define(VAR_ACCESS, "access"). +-define(LEGACY_SUBSCRIBE_ACTION, 1). +-define(LEGACY_PUBLISH_ACTION, 2). + -define(ALLOWED_VARS, [ ?VAR_USERNAME, ?VAR_CLIENTID, @@ -47,6 +51,7 @@ ?VAR_ACTION, ?VAR_CERT_SUBJECT, ?VAR_CERT_CN_NAME, + ?VAR_ACCESS, ?VAR_NS_CLIENT_ATTRS ]). @@ -185,7 +190,14 @@ generate_request(Action, Topic, Client, Config) -> client_vars(Client, Action, Topic) -> Vars = emqx_authz_utils:vars_for_rule_query(Client, Action), - Vars#{topic => Topic}. + add_legacy_access_var(Vars#{topic => Topic}). + +add_legacy_access_var(#{action := subscribe} = Vars) -> + Vars#{access => ?LEGACY_SUBSCRIBE_ACTION}; +add_legacy_access_var(#{action := publish} = Vars) -> + Vars#{access => ?LEGACY_PUBLISH_ACTION}; +add_legacy_access_var(Vars) -> + Vars. allowed_vars() -> allowed_vars(emqx_authz:feature_available(rich_actions)). diff --git a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl index 297d7d439..06e46e726 100644 --- a/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl @@ -199,6 +199,7 @@ t_query_params(_Config) -> mountpoint := <<"MOUNTPOINT">>, topic := <<"t/1">>, action := <<"publish">>, + access := <<"2">>, qos := <<"1">>, retain := <<"false">> } = cowboy_req:match_qs( @@ -210,6 +211,7 @@ t_query_params(_Config) -> mountpoint, topic, action, + access, qos, retain ], @@ -227,6 +229,7 @@ t_query_params(_Config) -> "mountpoint=${mountpoint}&" "topic=${topic}&" "action=${action}&" + "access=${access}&" "qos=${qos}&" "retain=${retain}" >> @@ -261,6 +264,7 @@ t_path(_Config) -> "MOUNTPOINT/" "t%2F1/" "publish/" + "2/" "1/" "false" >>, @@ -278,6 +282,7 @@ t_path(_Config) -> "${mountpoint}/" "${topic}/" "${action}/" + "${access}/" "${qos}/" "${retain}" >> @@ -318,6 +323,7 @@ t_json_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>, + <<"access">> := <<"2">>, <<"qos">> := <<"1">>, <<"retain">> := <<"false">> }, @@ -335,6 +341,7 @@ t_json_body(_Config) -> <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>, + <<"access">> => <<"${access}">>, <<"qos">> => <<"${qos}">>, <<"retain">> => <<"${retain}">> } @@ -413,6 +420,7 @@ t_placeholder_and_body(_Config) -> <<"mountpoint">> := <<"MOUNTPOINT">>, <<"topic">> := <<"t">>, <<"action">> := <<"publish">>, + <<"access">> := <<"2">>, <<"CN">> := ?PH_CERT_CN_NAME, <<"CS">> := ?PH_CERT_SUBJECT }, @@ -430,6 +438,7 @@ t_placeholder_and_body(_Config) -> <<"mountpoint">> => <<"${mountpoint}">>, <<"topic">> => <<"${topic}">>, <<"action">> => <<"${action}">>, + <<"access">> => <<"${access}">>, <<"CN">> => ?PH_CERT_CN_NAME, <<"CS">> => ?PH_CERT_SUBJECT }, diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 3cf991d77..4f01ab34a 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -606,6 +606,7 @@ pgsql_server() -> pgsql_config() -> #{ auto_reconnect => true, + disable_prepared_statements => false, database => <<"mqtt">>, username => <<"root">>, password => <<"public">>, diff --git a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl index 84b859e62..9365604fc 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl @@ -426,6 +426,7 @@ setup_config(SpecialParams) -> pgsql_config() -> #{ auto_reconnect => true, + disable_prepared_statements => false, database => <<"mqtt">>, username => <<"root">>, password => <<"public">>, diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index e6feac7bd..79e8fc8f8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1151,7 +1151,18 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefin post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES -> - ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf), + case uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf) of + ok -> + ok; + {error, timeout} -> + throw(<< + "Timed out trying to remove action or source. Please try again and," + " if the error persists, try disabling the connector before retrying." + >>); + {error, not_found} -> + %% Should not happen, unless config is inconsistent. + throw(<<"Referenced connector not found">>) + end, ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf), Bridges = emqx_utils_maps:deep_put( [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 8ba2ef487..99caba625 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -879,6 +879,8 @@ handle_disable_enable(ConfRootKey, Id, Enable) -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); {error, timeout} -> ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, Reason} when is_binary(Reason) -> + ?BAD_REQUEST(Reason); {error, Reason} -> ?INTERNAL_ERROR(Reason) end diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 8cd5ee427..5236d9a0e 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index a969ac83b..a3997ada7 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index fd7f03da8..78569b321 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -5,7 +5,7 @@ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, + {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, {snappyer, "1.2.9"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src index 80864ba67..c7f2c2bf8 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_matrix, [ {description, "EMQX Enterprise MatrixDB Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl index 61a860db9..25b6e7699 100644 --- a/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl +++ b/apps/emqx_bridge_matrix/src/emqx_bridge_matrix_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> matrix. @@ -20,3 +21,9 @@ action_type_name() -> matrix. connector_type_name() -> matrix. schema_module() -> emqx_bridge_matrix. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ). diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src index d4f97a721..d223a2488 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pgsql, [ {description, "EMQX Enterprise PostgreSQL Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl index 5a0b9eb5b..d26b37e96 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.erl @@ -82,6 +82,7 @@ fields("get_bridge_v2") -> fields("post_bridge_v2") -> fields("post", pgsql, pgsql_action); fields("config") -> + %% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale) [ {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {sql, @@ -95,8 +96,11 @@ fields("config") -> #{desc => ?DESC("local_topic"), default => undefined} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ - (emqx_postgresql:fields(config) -- - emqx_connector_schema_lib:prepare_statement_fields()); + proplists:delete( + disable_prepared_statements, + emqx_postgresql:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields() + ); fields("post") -> fields("post", ?ACTION_TYPE, "config"); fields("put") -> diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl index 55d6d156b..0d10bbbcc 100644 --- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl +++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> pgsql. @@ -20,3 +21,20 @@ action_type_name() -> pgsql. connector_type_name() -> pgsql. schema_module() -> emqx_bridge_pgsql. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ), + maps:with(bridge_v1_fields(), Config0). + +%%------------------------------------------------------------------------------------------ +%% Internal helper functions +%%------------------------------------------------------------------------------------------ + +bridge_v1_fields() -> + [ + emqx_utils_conv:bin(K) + || {K, _V} <- emqx_bridge_pgsql:fields("config") + ]. diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl index 9dff5ab22..7e503cebb 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE, pgsql). -define(BRIDGE_TYPE_BIN, <<"pgsql">>). @@ -33,7 +34,18 @@ %%------------------------------------------------------------------------------ all() -> - emqx_common_test_helpers:all(?MODULE). + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. + +matrix_cases() -> + [ + t_disable_prepared_statements + ]. + +groups() -> + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). init_per_suite(Config) -> PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), @@ -80,10 +92,26 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. -init_per_testcase(TestCase, Config) -> - common_init_per_testcase(TestCase, Config). +init_per_group(Group, Config) when + Group =:= postgres; + Group =:= timescale; + Group =:= matrix +-> + [ + {bridge_type, group_to_type(Group)}, + {connector_type, group_to_type(Group)} + | Config + ]; +init_per_group(_Group, Config) -> + Config. -common_init_per_testcase(TestCase, Config) -> +group_to_type(postgres) -> pgsql; +group_to_type(Group) -> Group. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config) -> ct:timetrap(timer:seconds(60)), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_config:delete_override_conf_files(), @@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) -> BridgeConfig = bridge_config(Name, Name), ok = snabbkaffe:start_trace(), [ - {connector_type, ?CONNECTOR_TYPE}, + {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)}, {connector_name, Name}, {connector_config, ConnectorConfig}, - {bridge_type, ?BRIDGE_TYPE}, + {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)}, {bridge_name, Name}, {bridge_config, BridgeConfig} | NConfig @@ -232,3 +260,20 @@ t_sync_query(Config) -> t_start_action_or_source_with_disabled_connector(Config) -> ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok. + +t_disable_prepared_statements(matrix) -> + [[postgres], [timescale], [matrix]]; +t_disable_prepared_statements(Config0) -> + ConnectorConfig0 = ?config(connector_config, Config0), + ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}), + Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + fun make_message/0, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + postgres_bridge_connector_on_query_return + ), + ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index e12155bb1..f117c4e7a 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -128,8 +128,8 @@ on_query( #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result} ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. on_batch_query( @@ -165,8 +165,8 @@ on_batch_query( } ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. trace_format_commands(Commands0) -> @@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) -> end. proc_command_template(CommandTemplate, Msg) -> - lists:map( - fun(ArgTks) -> - emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) - end, - CommandTemplate + lists:reverse( + lists:foldl( + fun(ArgTks, Acc) -> + New = proc_tmpl(ArgTks, Msg), + lists:reverse(New, Acc) + end, + [], + CommandTemplate + ) ). preproc_command_template(CommandTemplate) -> @@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) -> fun emqx_placeholder:preproc_tmpl/1, CommandTemplate ). + +%% This function mimics emqx_placeholder:proc_tmpl/3 but with an +%% injected special handling of map_to_redis_hset_args result +%% which is a list of redis command args (all in binary string format) +proc_tmpl([{var, Phld}], Data) -> + case emqx_placeholder:lookup_var(Phld, Data) of + [map_to_redis_hset_args | L] -> + L; + Other -> + [emqx_utils_conv:bin(Other)] + end; +proc_tmpl(Tokens, Data) -> + %% more than just a var ref, but a string, or a concatenation of string and a var + %% this is must be a single arg, format it into a binary + [emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})]. diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index f0fb8872d..7d3003bfa 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -46,7 +46,8 @@ matrix_testcases() -> t_start_stop, t_create_via_http, t_on_get_status, - t_sync_query + t_sync_query, + t_map_to_redis_hset_args ]. init_per_suite(Config) -> @@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) -> Path = group_path(Config), ct:comment(Path), ConnectorConfig = connector_config(Name, Path, NConfig), - BridgeConfig = action_config(Name, Path, Name), + BridgeConfig = action_config(Name, Path, Name, TestCase), ok = snabbkaffe:start_trace(), [ {connector_type, ?CONNECTOR_TYPE}, @@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) -> ct:pal("parsed config: ~p", [Config]), InnerConfigMap. -action_config(Name, Path, ConnectorId) -> +action_config(Name, Path, ConnectorId, TestCase) -> + Template = + try + ?MODULE:TestCase(command_template) + catch + _:_ -> + [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>] + end, [RedisType, _Transport | _] = Path, CommonCfg = #{ @@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) -> <<"connector">> => ConnectorId, <<"parameters">> => #{ - <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>], + <<"command_template">> => Template, <<"redis_type">> => atom_to_binary(RedisType) }, <<"local_topic">> => <<"t/redis">>, @@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) -> emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap). make_message() -> - ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + make_message_with_payload(Payload). + +make_message_with_payload(Payload) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), #{ clientid => ClientId, payload => Payload, @@ -290,7 +301,7 @@ t_start_stop(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_start_stop(Config) -> +t_start_stop(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped), ok. @@ -300,7 +311,7 @@ t_create_via_http(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_create_via_http(Config) -> +t_create_via_http(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_create_via_http(Config), ok. @@ -310,7 +321,7 @@ t_on_get_status(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_on_get_status(Config) -> +t_on_get_status(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. @@ -320,7 +331,7 @@ t_sync_query(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_sync_query(Config) -> +t_sync_query(Config) when is_list(Config) -> ok = emqx_bridge_v2_testlib:t_sync_query( Config, fun make_message/0, @@ -328,3 +339,22 @@ t_sync_query(Config) -> redis_bridge_connector_send_done ), ok. + +t_map_to_redis_hset_args(matrix) -> + {map_to_redis_hset_args, [ + [single, tcp], + [sentinel, tcp], + [cluster, tcp] + ]}; +t_map_to_redis_hset_args(command_template) -> + [<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>]; +t_map_to_redis_hset_args(Config) when is_list(Config) -> + Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}), + MsgFn = fun() -> make_message_with_payload(Payload) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + MsgFn, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + redis_bridge_connector_send_done + ), + ok. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 840715ac3..603ef18d0 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -78,7 +78,7 @@ %% https://www.erlang.org/doc/man/odbc.html %% as returned by connect/2 --type connection_reference() :: pid(). +-type connection_reference() :: odbc:connection_reference(). -type time_out() :: milliseconds() | infinity. -type sql() :: string() | binary(). -type milliseconds() :: pos_integer(). diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src index 477da7c9e..914354cad 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_timescale, [ {description, "EMQX Enterprise TimescaleDB Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource]}, {env, [ diff --git a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl index 04335b91a..1ebb71d7a 100644 --- a/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl +++ b/apps/emqx_bridge_timescale/src/emqx_bridge_timescale_action_info.erl @@ -10,7 +10,8 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2 ]). bridge_v1_type_name() -> timescale. @@ -20,3 +21,9 @@ action_type_name() -> timescale. connector_type_name() -> timescale. schema_module() -> emqx_bridge_timescale. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, + ActionConfig + ). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 97f68b7ef..4c7a0476e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) -> OkResult; is_ok(Error = {error, _}) -> Error; +is_ok(timeout) -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach either + %% `?status_connected' or `?status_disconnected' within `start_timeout'. + timeout; is_ok(ResL) -> case lists:filter( @@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> ?NO_CONTENT; + timeout -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach + %% either `?status_connected' or `?status_disconnected' within + %% `start_timeout'. + ?BAD_REQUEST(<< + "Timeout while waiting for connector to reach connected status." + " Please try again." + >>); {error, not_implemented} -> ?NOT_IMPLEMENTED; {error, timeout} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 2b1ada37b..0f57e9034 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -536,14 +536,20 @@ do_start_connector(TestType, Config) -> request_json( post, uri(["connectors"]), - ?KAFKA_CONNECTOR(BadName, BadServer), + (?KAFKA_CONNECTOR(BadName, BadServer))#{ + <<"resource_opts">> => #{ + <<"start_timeout">> => <<"10ms">> + } + }, Config ) ), BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName), + %% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to + %% be connected doesn't return a 500 error. ?assertMatch( %% request from product: return 400 on such errors - {ok, SC, _} when SC == 500 orelse SC == 400, + {ok, 400, _}, request(post, {operation, TestType, start, BadConnectorID}, Config) ), ok = gen_tcp:close(Sock), diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index 6501d5654..b53aef959 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx, erlavro]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 961855e17..baad35af5 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -136,6 +136,19 @@ parse_name_vsn(NameVsn) when is_list(NameVsn) -> make_name_vsn_string(Name, Vsn) -> binary_to_list(iolist_to_binary([Name, "-", Vsn])). +app_dir(AppName, Apps) -> + case + lists:filter( + fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end, + Apps + ) + of + [AppNameVsn] -> + {ok, AppNameVsn}; + _ -> + {error, not_found} + end. + %%-------------------------------------------------------------------- %% Package operations @@ -1372,12 +1385,14 @@ plugin_dir(NameVsn) -> -spec plugin_priv_dir(name_vsn()) -> string(). plugin_priv_dir(NameVsn) -> - case read_plugin_info(NameVsn, #{fill_readme => false}) of - {ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> - AppDir = make_name_vsn_string(Name, Vsn), - wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); - _ -> - wrap_to_list(filename:join([install_dir(), NameVsn, "priv"])) + maybe + {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?= + read_plugin_info(NameVsn, #{fill_readme => false}), + {ok, AppDir} ?= app_dir(Name, Apps), + wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"])) + else + %% Otherwise assume the priv directory is under the plugin root directory + _ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"])) end. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 5faf0aa47..2cf3392bf 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index ad674a07c..7fe564dc3 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -50,6 +50,8 @@ execute_batch/3 ]). +-export([disable_prepared_statements/0]). + %% for ecpool workers usage -export([do_get_status/1, prepare_sql_to_conn/2]). @@ -62,7 +64,7 @@ #{ pool_name := binary(), query_templates := #{binary() => template()}, - prepares := #{binary() => epgsql:statement()} | {error, _} + prepares := disabled | #{binary() => epgsql:statement()} | {error, _} }. %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' @@ -78,7 +80,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - [{server, server()}] ++ + [ + {server, server()}, + disable_prepared_statements() + ] ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). @@ -87,6 +92,17 @@ server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). +disable_prepared_statements() -> + {disable_prepared_statements, + hoconsc:mk( + boolean(), + #{ + default => false, + required => false, + desc => ?DESC("disable_prepared_statements") + } + )}. + adjust_fields(Fields) -> lists:map( fun @@ -108,6 +124,7 @@ on_start( InstId, #{ server := Server, + disable_prepared_statements := DisablePreparedStatements, database := DB, username := User, pool_size := PoolSize, @@ -143,11 +160,16 @@ on_start( {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], - State1 = parse_prepare_sql(Config, <<"send_message">>), + State1 = parse_sql_template(Config, <<"send_message">>), State2 = State1#{installed_channels => #{}}, case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; + Prepares = + case DisablePreparedStatements of + true -> disabled; + false -> #{} + end, + {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -209,13 +231,17 @@ on_add_channel( create_channel_state( ChannelId, - #{pool_name := PoolName} = _ConnectorState, + #{ + pool_name := PoolName, + prepares := Prepares + } = _ConnectorState, #{parameters := Parameters} = _ChannelConfig ) -> - State1 = parse_prepare_sql(Parameters, ChannelId), + State1 = parse_sql_template(Parameters, ChannelId), {ok, init_prepare(State1#{ pool_name => PoolName, + prepares => Prepares, prepare_statement => #{} })}. @@ -233,6 +259,8 @@ on_remove_channel( NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. +close_prepared_statement(_ChannelId, #{prepares := disabled}) -> + ok; close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], close_prepared_statement(WorkerPids, ChannelId, State), @@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) -> %% prepared statement doesn't exist. try ecpool_worker:client(WorkerPid) of {ok, Conn} -> - Statement = get_prepared_statement(ChannelId, State), + Statement = get_templated_statement(ChannelId, State), _ = epgsql:close(Conn, Statement), close_prepared_statement(Rest, ChannelId, State); _ -> @@ -303,21 +331,23 @@ on_query( sql => NameOrSQL, state => State }), - Type = pgsql_query_type(TypeOrKey), + Type = pgsql_query_type(TypeOrKey, State), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). -pgsql_query_type(sql) -> +pgsql_query_type(_TypeOrTag, #{prepares := disabled}) -> query; -pgsql_query_type(query) -> +pgsql_query_type(sql, _ConnectorState) -> query; -pgsql_query_type(prepared_query) -> +pgsql_query_type(query, _ConnectorState) -> + query; +pgsql_query_type(prepared_query, _ConnectorState) -> prepared_query; %% for bridge -pgsql_query_type(_) -> - pgsql_query_type(prepared_query). +pgsql_query_type(_, ConnectorState) -> + pgsql_query_type(prepared_query, ConnectorState). on_batch_query( InstId, @@ -336,9 +366,9 @@ on_batch_query( ?SLOG(error, Log), {error, {unrecoverable_error, batch_prepare_not_implemented}}; {_Statement, RowTemplate} -> - PrepStatement = get_prepared_statement(BinKey, State), + StatementTemplate = get_templated_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], - case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of + case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of {error, _Error} = Result -> handle_result(Result); {_Column, Results} -> @@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> + DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled, BinKey = to_bin(TypeOrKey), case get_template(BinKey, State) of undefined -> {SQLOrData, Params}; - {_Statement, RowTemplate} -> - {BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} + {Statement, RowTemplate} -> + Rendered = render_prepare_sql_row(RowTemplate, SQLOrData), + case DisablePreparedStatements of + true -> + {Statement, Rendered}; + false -> + {BinKey, Rendered} + end end. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> @@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) -> BinKey = to_bin(Key), maps:get(BinKey, Templates, undefined). -get_prepared_statement(Key, #{installed_channels := Channels} = _State) when +get_templated_statement(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> BinKey = to_bin(Key), ChannelState = maps:get(BinKey, Channels), ChannelPreparedStatements = maps:get(prepares, ChannelState), maps:get(BinKey, ChannelPreparedStatements); -get_prepared_statement(Key, #{prepares := PrepStatements}) -> +get_templated_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). @@ -480,6 +517,8 @@ do_check_prepares( {error, Reason} -> {error, Reason} end; +do_check_prepares(#{prepares := disabled}) -> + ok; do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> ok; do_check_prepares(#{prepares := {error, _}} = State) -> @@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) -> conn_opts(Opts, Acc). -parse_prepare_sql(Config, SQLID) -> +parse_sql_template(Config, SQLID) -> Queries = case Config of #{prepare_statement := Qs} -> @@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) -> #{} -> #{} end, - Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), + Templates = maps:fold(fun parse_sql_template/3, #{}, Queries), #{query_templates => Templates}. -parse_prepare_sql(Key, Query, Acc) -> +parse_sql_template(Key, Query, Acc) -> Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}), Acc#{Key => Template}. @@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) -> {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}), Row. +init_prepare(State = #{prepares := disabled}) -> + State; init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> State; init_prepare(State = #{}) -> diff --git a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl index fe5c4cd78..515fbe4b0 100644 --- a/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl +++ b/apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl @@ -47,7 +47,10 @@ roots() -> []. fields("connection_fields") -> - [{server, server()}] ++ + [ + {server, server()}, + emqx_postgresql:disable_prepared_statements() + ] ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields("config_connector") -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 8340bf589..9bd6b1390 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,6 +85,7 @@ get_allocated_resources_list/1, forget_allocated_resources/1, deallocate_resource/2, + clean_allocated_resources/2, %% Get channel config from resource call_get_channel_config/3, % Call the format query result function diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d650a2afb..4763094d0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -63,6 +63,10 @@ %% Internal exports. -export([worker_resource_health_check/1, worker_channel_health_check/2]). +-ifdef(TEST). +-export([stop/2]). +-endif. + % State record -record(data, { id, @@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) -> -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> try - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => remove, + resource_id => ResId + }), + force_kill(ResId), + ok; + Res -> + Res + end after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition. @@ -274,7 +288,7 @@ restart(ResId, Opts) when is_binary(ResId) -> end. %% @doc Start the resource --spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}. start(ResId, Opts) -> StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION), case safe_call(ResId, start, StartTimeout) of @@ -287,9 +301,20 @@ start(ResId, Opts) -> %% @doc Stop the resource -spec stop(resource_id()) -> ok | {error, Reason :: term()}. stop(ResId) -> - case safe_call(ResId, stop, ?T_OPERATION) of + stop(ResId, ?T_OPERATION). + +-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}. +stop(ResId, Timeout) -> + case safe_call(ResId, stop, Timeout) of ok -> ok; + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => stop, + resource_id => ResId + }), + force_kill(ResId), + ok; {error, _Reason} = Error -> Error end. @@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when get_error(_ResId, #{error := Error}) -> Error. +force_kill(ResId) -> + case gproc:whereis_name(?NAME(ResId)) of + undefined -> + ok; + Pid when is_pid(Pid) -> + exit(Pid, kill), + try_clean_allocated_resources(ResId), + ok + end. + +try_clean_allocated_resources(ResId) -> + case try_read_cache(ResId) of + #data{mod = Mod} -> + catch emqx_resource:clean_allocated_resources(ResId, Mod), + ok; + _ -> + ok + end. + %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server @@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> Data. stop_resource(#data{state = ResState, id = ResId} = Data) -> - %% We don't care the return value of the Mod:on_stop/2. + %% We don't care about the return value of `Mod:on_stop/2'. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 754727e8c..0fc11cc66 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -71,6 +71,16 @@ set_callback_mode(Mode) -> on_start(_InstId, #{create_error := true}) -> ?tp(connector_demo_start_error, #{}), error("some error"); +on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) -> + ?tp(connector_demo_start_delay, #{}), + case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of + not_called -> + emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep), + timer:sleep(Delay), + on_start(InstId, maps:remove(create_error, Opts)); + called -> + on_start(InstId, maps:remove(create_error, Opts)) + end; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), StopError = maps:get(stop_error, Opts, false), @@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) -> pid => spawn_counter_process(Name, Register) }}. +on_stop(_InstId, undefined) -> + ?tp(connector_demo_free_resources_without_state, #{}), + ok; on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; on_stop(InstId, #{pid := Pid}) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 05a2f711d..764c65e6f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) -> ), ok. +%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync +%% call such as `on_start', and that the claimed resources, if any, are freed. +t_force_stop(_Config) -> + ?check_trace( + begin + {ok, Agent} = emqx_utils_agent:start_link(not_called), + {ok, _} = + create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{ + name => test_resource, + create_error => {delay, 30_000, Agent} + }, + #{ + health_check_interval => 100, + start_timeout => 100 + } + ), + ?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)), + ok + end, + [ + log_consistency_prop(), + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)), + ?assertMatch( + [_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace) + ), + ?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)), + ok + end + ] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 5a6533f5e..4f0214a9d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -733,8 +733,8 @@ event_info() -> event_info_schema_validation_failed() -> event_info_common( 'schema.validation_failed', - {<<"schema validation failed">>, <<"TODO"/utf8>>}, - {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, + {<<"schema validation failed">>, <<"schema 验证失败"/utf8>>}, + {<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>}, <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">> ). ee_event_info() -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 604f43d82..9de7b0173 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -160,6 +160,7 @@ find/3, join_to_string/1, join_to_string/2, + map_to_redis_hset_args/1, join_to_sql_values_string/1, jq/2, jq/3, @@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str). join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List). +%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields. +%% Notes: +%% - Non-string keys in the input map are dropped +%% - Keys are not quoted +%% - String values are always quoted +%% - No escape sequence for keys and values +%% - Float point values are formatted with fixed (6) decimal point compact-formatting +map_to_redis_hset_args(Map) when erlang:is_map(Map) -> + [map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)]. + +redis_hset_acc(K, V, IoData) -> + try + [redis_field_name(K), redis_field_value(V) | IoData] + catch + _:_ -> + IoData + end. + +redis_field_name(K) when erlang:is_binary(K) -> + K; +redis_field_name(K) -> + throw({bad_redis_field_name, K}). + +redis_field_value(V) when erlang:is_binary(V) -> + V; +redis_field_value(V) when erlang:is_integer(V) -> + integer_to_binary(V); +redis_field_value(V) when erlang:is_float(V) -> + float2str(V, 6); +redis_field_value(V) when erlang:is_boolean(V) -> + atom_to_binary(V). + join_to_sql_values_string(List) -> QuotedList = [ diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index e260b04e1..eb7b97b5f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1376,6 +1376,27 @@ t_parse_date_errors(_) -> ok. +t_map_to_redis_hset_args(_Config) -> + Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end, + ?assertEqual([], Do(#{})), + ?assertEqual([], Do(#{1 => 2})), + ?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})), + ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})), + ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})), + ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})), + ?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})), + ?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})), + %% no determined ordering + ?assert( + case Do(#{<<"a">> => 1, <<"b">> => 2}) of + [<<"a">>, <<"1">>, <<"b">>, <<"2">>] -> + true; + [<<"b">>, <<"2">>, <<"a">>, <<"1">>] -> + true + end + ), + ok. + %%------------------------------------------------------------------------------ %% Utility functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 84000669d..ddc32cd0d 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -37,7 +37,8 @@ proc_tmpl_deep/3, bin/1, - sql_data/1 + sql_data/1, + lookup_var/2 ]). -export([ diff --git a/apps/emqx_utils/test/emqx_utils_agent.erl b/apps/emqx_utils/test/emqx_utils_agent.erl new file mode 100644 index 000000000..4280a491f --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_agent.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html). + +-module(emqx_utils_agent). + +%% API +-export([start_link/1, get/1, get_and_update/2]). + +%% `gen_server' API +-export([init/1, handle_call/3]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-type state() :: term(). + +-type get_and_update_fn() :: fun((state()) -> {term(), state()}). + +-record(get_and_update, {fn :: get_and_update_fn()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link(state()) -> gen_server:start_ret(). +start_link(InitState) -> + gen_server:start_link(?MODULE, InitState, []). + +-spec get(gen_server:server_ref()) -> term(). +get(ServerRef) -> + Fn = fun(St) -> {St, St} end, + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term(). +get_and_update(ServerRef, Fn) -> + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(InitState) -> + {ok, InitState}. + +handle_call(#get_and_update{fn = Fn}, _From, State0) -> + {Reply, State} = Fn(State0), + {reply, Reply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/bin/emqx b/bin/emqx index b504ce8b4..a2f9e2691 100755 --- a/bin/emqx +++ b/bin/emqx @@ -1204,7 +1204,7 @@ case "${COMMAND}" in esac case "$COMMAND" in foreground) - FOREGROUNDOPTIONS="-enable-feature maybe_expr -noshell -noinput +Bd" + FOREGROUNDOPTIONS="-enable-feature maybe_expr -noinput -noshell +Bd" ;; *) FOREGROUNDOPTIONS='-enable-feature maybe_expr' diff --git a/build b/build index bf5c348f9..bcea44a30 100755 --- a/build +++ b/build @@ -397,11 +397,11 @@ function is_ecr_and_enterprise() { ## Build the default docker image based on debian 12. make_docker() { - local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-5}" + local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-7}" local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}" - local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-25.3.2-2}" - local EMQX_BUILDER_ELIXIR="${EMQX_BUILDER_ELIXIR:-1.15.7}" - local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} + local OTP_VSN="${OTP_VSN:-26.2.5-1}" + local ELIXIR_VSN="${ELIXIR_VSN:-1.15.7}" + local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}} local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}" local EMQX_DOCKERFILE="${EMQX_DOCKERFILE:-deploy/docker/Dockerfile}" local EMQX_SOURCE_TYPE="${EMQX_SOURCE_TYPE:-src}" @@ -465,7 +465,7 @@ make_docker() { --label org.opencontainers.image.description="${PRODUCT_DESCRIPTION}" \ --label org.opencontainers.image.documentation="${DOCUMENTATION_URL}" \ --label org.opencontainers.image.licenses="${LICENSE}" \ - --label org.opencontainers.image.otp.version="${EMQX_BUILDER_OTP}" \ + --label org.opencontainers.image.otp.version="${OTP_VSN}" \ --pull ) :> ./.emqx_docker_image_tags @@ -477,7 +477,7 @@ make_docker() { DOCKER_BUILDX_ARGS+=(--no-cache) fi if [ "${SUFFIX}" = '-elixir' ]; then - DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${EMQX_BUILDER_ELIXIR}") + DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${ELIXIR_VSN}") fi if [ "${DOCKER_LATEST:-false}" = true ]; then for r in "${DOCKER_REGISTRIES[@]}"; do diff --git a/changes/ce/feat-13175.en.md b/changes/ce/feat-13175.en.md new file mode 100644 index 000000000..f673c1352 --- /dev/null +++ b/changes/ce/feat-13175.en.md @@ -0,0 +1,3 @@ +Added the `disable_prepared_statements` option for Postgres-based connectors. + +This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode. diff --git a/changes/ce/fix-13148.en.md b/changes/ce/fix-13148.en.md new file mode 100644 index 000000000..15002e132 --- /dev/null +++ b/changes/ce/fix-13148.en.md @@ -0,0 +1 @@ +Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected. diff --git a/changes/ce/fix-13164.en.md b/changes/ce/fix-13164.en.md new file mode 100644 index 000000000..c0ce937da --- /dev/null +++ b/changes/ce/fix-13164.en.md @@ -0,0 +1,6 @@ +Fix HTTP authorization request body encoding. + +Prior to this fix, the HTTP authorization request body encoding format was taken from the `accept` header. +The fix is to respect the `content-type` header instead. +Also added `access` templating variable for v4 compatibility. +The access code of SUBSCRIBE action is `1` and SUBSCRIBE action is `2`. diff --git a/changes/ce/fix-13181.en.md b/changes/ce/fix-13181.en.md new file mode 100644 index 000000000..984a9af76 --- /dev/null +++ b/changes/ce/fix-13181.en.md @@ -0,0 +1,3 @@ +Now, when attempting to stop a connector, if such operation times out, we forcefully shut down the connector process. + +Error messages when attempting to disable an action/source when its underlying connector is stuck were also improved. diff --git a/changes/ee/feat-13172.en.md b/changes/ee/feat-13172.en.md new file mode 100644 index 000000000..259dc075a --- /dev/null +++ b/changes/ee/feat-13172.en.md @@ -0,0 +1,5 @@ +Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values. + +For example, if `payload.value` is a map of multiple data fields, +this rule `SELECT map_to_redis_hset_args(payload.value) as hset_fields FROM "t/#"` can prepare `hset_fields` +for redis action to render the command template like `HMSET name1 ${hset_fields}`. diff --git a/changes/ee/fix-13093.en.md b/changes/ee/fix-13093.en.md new file mode 100644 index 000000000..ebae132f1 --- /dev/null +++ b/changes/ee/fix-13093.en.md @@ -0,0 +1,3 @@ +Improve Kafka consumer group stability. + +Prior to this change, Kafka consumer group sometimes may need to rebalance twice after Kafka group coordinator restart. diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index fcff6984e..9eb9b2518 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim ARG SOURCE_TYPE=src # tgz diff --git a/mix.exs b/mix.exs index b3ecc6252..2bbf15360 100644 --- a/mix.exs +++ b/mix.exs @@ -102,8 +102,7 @@ defmodule EMQXUmbrella.MixProject do {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}, {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}, {:ra, "2.7.3", override: true}, - {:mimerl, "1.2.0", override: true}, - {:supervisor3, "1.1.12", override: true} + {:mimerl, "1.2.0", override: true} ] ++ emqx_apps(profile_info, version) ++ enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep() @@ -215,7 +214,7 @@ defmodule EMQXUmbrella.MixProject do {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, - {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, + {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, {:snappyer, "1.2.9", override: true}, {:crc32cer, "0.1.8", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, diff --git a/rel/i18n/emqx_postgresql.hocon b/rel/i18n/emqx_postgresql.hocon index 9740b0814..159a9a727 100644 --- a/rel/i18n/emqx_postgresql.hocon +++ b/rel/i18n/emqx_postgresql.hocon @@ -14,4 +14,13 @@ config_connector.desc: config_connector.label: """PostgreSQL Connector Config""" +disable_prepared_statements.label: +"""Disable Prepared Statements""" +disable_prepared_statements.desc: +"""~ +Disables the usage of prepared statements in the connections. +Some endpoints, like PGBouncer or Supabase in Transaction mode, do not +support session features such as prepared statements. For such connections, +this option should be enabled.~""" + } diff --git a/scripts/buildx.sh b/scripts/buildx.sh index 3c358a934..c222127b3 100755 --- a/scripts/buildx.sh +++ b/scripts/buildx.sh @@ -9,7 +9,7 @@ ## example: ## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \ -## --builder ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 +## --builder ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12 set -euo pipefail @@ -24,7 +24,7 @@ help() { echo "--arch amd64|arm64: Target arch to build the EMQX package for" echo "--src_dir : EMQX source code in this dir, default to PWD" echo "--builder : Builder image to pull" - echo " E.g. ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12" + echo " E.g. ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12" } die() { diff --git a/scripts/pr-sanity-checks.sh b/scripts/pr-sanity-checks.sh index be02a337d..ad8fbaaa4 100755 --- a/scripts/pr-sanity-checks.sh +++ b/scripts/pr-sanity-checks.sh @@ -12,11 +12,11 @@ if ! type "yq" > /dev/null; then exit 1 fi -EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-5} -EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.1-2} -EMQX_BUILDER_ELIXIR=${EMQX_BUILDER_ELIXIR:-1.15.7} +EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-7} +OTP_VSN=${OTP_VSN:-26.2.5-1} +ELIXIR_VSN=${ELIXIR_VSN:-1.15.7} EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04} -EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} +EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}} commands=$(yq ".jobs.sanity-checks.steps[].run" .github/workflows/_pr_entrypoint.yaml | grep -v null) diff --git a/scripts/relup-test/start-relup-test-cluster.sh b/scripts/relup-test/start-relup-test-cluster.sh index 557fbbff9..a972fa210 100755 --- a/scripts/relup-test/start-relup-test-cluster.sh +++ b/scripts/relup-test/start-relup-test-cluster.sh @@ -22,7 +22,7 @@ WEBHOOK="webhook.$NET" BENCH="bench.$NET" COOKIE='this-is-a-secret' ## Erlang image is needed to run webhook server and emqtt-bench -ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" +ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04" # builder has emqtt-bench installed BENCH_IMAGE="$ERLANG_IMAGE" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 084c49644..ce08d0f6b 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -49,6 +49,7 @@ NIF OCSP OTP PEM +PGBouncer PINGREQ PSK PSK @@ -65,6 +66,7 @@ Riak SHA SMS Struct +Supabase TCP TLS TTL