Merge remote-tracking branch 'origin/release-57' into 0606-merge-release-57-to-master

This commit is contained in:
zmstone 2024-06-06 17:43:36 +02:00
commit ebf17c8143
64 changed files with 629 additions and 140 deletions

View File

@ -18,7 +18,7 @@ services:
- /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret - /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
kdc: kdc:
hostname: kdc.emqx.net hostname: kdc.emqx.net
image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04
container_name: kdc.emqx.net container_name: kdc.emqx.net
expose: expose:
- 88 # kdc - 88 # kdc

View File

@ -3,7 +3,7 @@ version: '3.9'
services: services:
erlang: erlang:
container_name: erlang container_name: erlang
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04} image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04}
env_file: env_file:
- credentials.env - credentials.env
- conf.env - conf.env

View File

@ -17,16 +17,16 @@ env:
jobs: jobs:
sanity-checks: sanity-checks:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" container: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
outputs: outputs:
ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
ct-host: ${{ steps.matrix.outputs.ct-host }} ct-host: ${{ steps.matrix.outputs.ct-host }}
ct-docker: ${{ steps.matrix.outputs.ct-docker }} ct-docker: ${{ steps.matrix.outputs.ct-docker }}
version-emqx: ${{ steps.matrix.outputs.version-emqx }} version-emqx: ${{ steps.matrix.outputs.version-emqx }}
version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }} version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }}
builder: "ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" builder: "ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
builder_vsn: "5.3-5" builder_vsn: "5.3-7"
otp_vsn: "26.2.1-2" otp_vsn: "26.2.5-1"
elixir_vsn: "1.15.7" elixir_vsn: "1.15.7"
permissions: permissions:
@ -96,13 +96,13 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c ' MATRIX="$(echo "${APPS}" | jq -c '
[ [
(.[] | select(.profile == "emqx") | . + { (.[] | select(.profile == "emqx") | . + {
builder: "5.3-5", builder: "5.3-7",
otp: "26.2.1-2", otp: "26.2.5-1",
elixir: "1.15.7" elixir: "1.15.7"
}), }),
(.[] | select(.profile == "emqx-enterprise") | . + { (.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.3-5", builder: "5.3-7",
otp: ["26.2.1-2"][], otp: ["26.2.5-1"][],
elixir: "1.15.7" elixir: "1.15.7"
}) })
] ]

View File

@ -24,7 +24,7 @@ env:
jobs: jobs:
prepare: prepare:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
container: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' container: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
outputs: outputs:
profile: ${{ steps.parse-git-ref.outputs.profile }} profile: ${{ steps.parse-git-ref.outputs.profile }}
release: ${{ steps.parse-git-ref.outputs.release }} release: ${{ steps.parse-git-ref.outputs.release }}
@ -32,9 +32,9 @@ jobs:
ct-matrix: ${{ steps.matrix.outputs.ct-matrix }} ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
ct-host: ${{ steps.matrix.outputs.ct-host }} ct-host: ${{ steps.matrix.outputs.ct-host }}
ct-docker: ${{ steps.matrix.outputs.ct-docker }} ct-docker: ${{ steps.matrix.outputs.ct-docker }}
builder: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' builder: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
builder_vsn: '5.3-5' builder_vsn: '5.3-7'
otp_vsn: '26.2.1-2' otp_vsn: '26.2.5-1'
elixir_vsn: '1.15.7' elixir_vsn: '1.15.7'
permissions: permissions:
@ -66,13 +66,13 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c ' MATRIX="$(echo "${APPS}" | jq -c '
[ [
(.[] | select(.profile == "emqx") | . + { (.[] | select(.profile == "emqx") | . + {
builder: "5.3-5", builder: "5.3-7",
otp: "26.2.1-2", otp: "26.2.5-1",
elixir: "1.15.7" elixir: "1.15.7"
}), }),
(.[] | select(.profile == "emqx-enterprise") | . + { (.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.3-5", builder: "5.3-7",
otp: ["26.2.1-2"][], otp: ["26.2.5-1"][],
elixir: "1.15.7" elixir: "1.15.7"
}) })
] ]
@ -107,8 +107,7 @@ jobs:
profile: ${{ needs.prepare.outputs.profile }} profile: ${{ needs.prepare.outputs.profile }}
publish: true publish: true
latest: ${{ needs.prepare.outputs.latest }} latest: ${{ needs.prepare.outputs.latest }}
# TODO: revert this back to needs.prepare.outputs.otp_vsn when OTP 26 bug is fixed otp_vsn: ${{ needs.prepare.outputs.otp_vsn }}
otp_vsn: 25.3.2-2
elixir_vsn: ${{ needs.prepare.outputs.elixir_vsn }} elixir_vsn: ${{ needs.prepare.outputs.elixir_vsn }}
builder_vsn: ${{ needs.prepare.outputs.builder_vsn }} builder_vsn: ${{ needs.prepare.outputs.builder_vsn }}
secrets: inherit secrets: inherit

View File

@ -53,7 +53,7 @@ on:
otp_vsn: otp_vsn:
required: false required: false
type: string type: string
default: '25.3.2-2' default: '26.2.5-1'
elixir_vsn: elixir_vsn:
required: false required: false
type: string type: string
@ -61,7 +61,7 @@ on:
builder_vsn: builder_vsn:
required: false required: false
type: string type: string
default: '5.3-5' default: '5.3-7'
permissions: permissions:
contents: read contents: read
@ -169,8 +169,8 @@ jobs:
EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' EMQX_DOCKERFILE: 'deploy/docker/Dockerfile'
PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} PKG_VSN: ${{ needs.build.outputs.PKG_VSN }}
EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }}
EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} OTP_VSN: ${{ inputs.otp_vsn }}
EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} ELIXIR_VSN: ${{ inputs.elixir_vsn }}
EMQX_SOURCE_TYPE: tgz EMQX_SOURCE_TYPE: tgz
run: | run: |
./build ${PROFILE} docker ./build ${PROFILE} docker
@ -218,8 +218,8 @@ jobs:
EMQX_DOCKERFILE: 'deploy/docker/Dockerfile' EMQX_DOCKERFILE: 'deploy/docker/Dockerfile'
PKG_VSN: ${{ needs.build.outputs.PKG_VSN }} PKG_VSN: ${{ needs.build.outputs.PKG_VSN }}
EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }} EMQX_BUILDER_VERSION: ${{ inputs.builder_vsn }}
EMQX_BUILDER_OTP: ${{ inputs.otp_vsn }} OTP_VSN: ${{ inputs.otp_vsn }}
EMQX_BUILDER_ELIXIR: ${{ inputs.elixir_vsn }} ELIXIR_VSN: ${{ inputs.elixir_vsn }}
EMQX_SOURCE_TYPE: tgz EMQX_SOURCE_TYPE: tgz
run: | run: |
./build ${PROFILE} docker ./build ${PROFILE} docker

View File

@ -55,7 +55,7 @@ on:
otp_vsn: otp_vsn:
required: false required: false
type: string type: string
default: '26.2.1-2' default: '26.2.5-1'
elixir_vsn: elixir_vsn:
required: false required: false
type: string type: string
@ -63,7 +63,7 @@ on:
builder_vsn: builder_vsn:
required: false required: false
type: string type: string
default: '5.3-5' default: '5.3-7'
permissions: permissions:
contents: read contents: read

View File

@ -23,8 +23,8 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
profile: profile:
- ['emqx', 'master', '5.3-5:1.15.7-26.2.1-2'] - ['emqx', 'master', '5.3-7:1.15.7-26.2.5-1']
- ['emqx', 'release-57', '5.3-5:1.15.7-26.2.1-2'] - ['emqx', 'release-57', '5.3-7:1.15.7-26.2.5-1']
os: os:
- ubuntu22.04 - ubuntu22.04
- amzn2023 - amzn2023
@ -92,7 +92,7 @@ jobs:
branch: branch:
- master - master
otp: otp:
- 26.2.1-2 - 26.2.5-1
os: os:
- macos-12-arm64 - macos-12-arm64

View File

@ -27,15 +27,15 @@ on:
builder: builder:
required: false required: false
type: string type: string
default: 'ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04' default: 'ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04'
builder_vsn: builder_vsn:
required: false required: false
type: string type: string
default: '5.3-5' default: '5.3-7'
otp_vsn: otp_vsn:
required: false required: false
type: string type: string
default: '26.2.1-2' default: '26.2.5-1'
elixir_vsn: elixir_vsn:
required: false required: false
type: string type: string
@ -54,9 +54,9 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
profile: profile:
- ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "x64"] - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "x64"]
- ["emqx", "26.2.1-2", "ubuntu22.04", "elixir", "arm64"] - ["emqx", "26.2.5-1", "ubuntu22.04", "elixir", "arm64"]
- ["emqx-enterprise", "26.2.1-2", "ubuntu22.04", "erlang", "x64"] - ["emqx-enterprise", "26.2.5-1", "ubuntu22.04", "erlang", "x64"]
container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}" container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"

View File

@ -18,7 +18,7 @@ jobs:
actions: read actions: read
security-events: write security-events: write
container: container:
image: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04 image: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04
strategy: strategy:
fail-fast: false fail-fast: false

View File

@ -26,7 +26,7 @@ jobs:
prepare: prepare:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: github.repository_owner == 'emqx' if: github.repository_owner == 'emqx'
container: ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu20.04 container: ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu20.04
outputs: outputs:
BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }} BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }} PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

View File

@ -74,7 +74,7 @@ jobs:
steps: steps:
- uses: erlef/setup-beam@2f0cc07b4b9bea248ae098aba9e1a8a1de5ec24c # v1.17.5 - uses: erlef/setup-beam@2f0cc07b4b9bea248ae098aba9e1a8a1de5ec24c # v1.17.5
with: with:
otp-version: 26.2.1 otp-version: 26.2.5
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
with: with:
repository: hawk/lux repository: hawk/lux

View File

@ -1,2 +1,2 @@
erlang 26.2.1-2 erlang 26.2.5-1
elixir 1.15.7-otp-26 elixir 1.15.7-otp-26

View File

@ -7,7 +7,7 @@ REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts SCRIPTS = $(CURDIR)/scripts
export EMQX_RELUP ?= true export EMQX_RELUP ?= true
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:12-slim
export EMQX_REL_FORM ?= tgz export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1 export QUICER_DOWNLOAD_FROM_RELEASE = 1
@ -20,8 +20,8 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.0 export EMQX_DASHBOARD_VERSION ?= v1.9.1-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0 export EMQX_EE_DASHBOARD_VERSION ?= e1.7.1-beta.1
-include default-profile.mk -include default-profile.mk
PROFILE ?= emqx PROFILE ?= emqx

View File

@ -37,6 +37,10 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-define(VAR_ACCESS, "access").
-define(LEGACY_SUBSCRIBE_ACTION, 1).
-define(LEGACY_PUBLISH_ACTION, 2).
-define(ALLOWED_VARS, [ -define(ALLOWED_VARS, [
?VAR_USERNAME, ?VAR_USERNAME,
?VAR_CLIENTID, ?VAR_CLIENTID,
@ -47,6 +51,7 @@
?VAR_ACTION, ?VAR_ACTION,
?VAR_CERT_SUBJECT, ?VAR_CERT_SUBJECT,
?VAR_CERT_CN_NAME, ?VAR_CERT_CN_NAME,
?VAR_ACCESS,
?VAR_NS_CLIENT_ATTRS ?VAR_NS_CLIENT_ATTRS
]). ]).
@ -185,7 +190,14 @@ generate_request(Action, Topic, Client, Config) ->
client_vars(Client, Action, Topic) -> client_vars(Client, Action, Topic) ->
Vars = emqx_authz_utils:vars_for_rule_query(Client, Action), Vars = emqx_authz_utils:vars_for_rule_query(Client, Action),
Vars#{topic => Topic}. add_legacy_access_var(Vars#{topic => Topic}).
add_legacy_access_var(#{action := subscribe} = Vars) ->
Vars#{access => ?LEGACY_SUBSCRIBE_ACTION};
add_legacy_access_var(#{action := publish} = Vars) ->
Vars#{access => ?LEGACY_PUBLISH_ACTION};
add_legacy_access_var(Vars) ->
Vars.
allowed_vars() -> allowed_vars() ->
allowed_vars(emqx_authz:feature_available(rich_actions)). allowed_vars(emqx_authz:feature_available(rich_actions)).

View File

@ -199,6 +199,7 @@ t_query_params(_Config) ->
mountpoint := <<"MOUNTPOINT">>, mountpoint := <<"MOUNTPOINT">>,
topic := <<"t/1">>, topic := <<"t/1">>,
action := <<"publish">>, action := <<"publish">>,
access := <<"2">>,
qos := <<"1">>, qos := <<"1">>,
retain := <<"false">> retain := <<"false">>
} = cowboy_req:match_qs( } = cowboy_req:match_qs(
@ -210,6 +211,7 @@ t_query_params(_Config) ->
mountpoint, mountpoint,
topic, topic,
action, action,
access,
qos, qos,
retain retain
], ],
@ -227,6 +229,7 @@ t_query_params(_Config) ->
"mountpoint=${mountpoint}&" "mountpoint=${mountpoint}&"
"topic=${topic}&" "topic=${topic}&"
"action=${action}&" "action=${action}&"
"access=${access}&"
"qos=${qos}&" "qos=${qos}&"
"retain=${retain}" "retain=${retain}"
>> >>
@ -261,6 +264,7 @@ t_path(_Config) ->
"MOUNTPOINT/" "MOUNTPOINT/"
"t%2F1/" "t%2F1/"
"publish/" "publish/"
"2/"
"1/" "1/"
"false" "false"
>>, >>,
@ -278,6 +282,7 @@ t_path(_Config) ->
"${mountpoint}/" "${mountpoint}/"
"${topic}/" "${topic}/"
"${action}/" "${action}/"
"${access}/"
"${qos}/" "${qos}/"
"${retain}" "${retain}"
>> >>
@ -318,6 +323,7 @@ t_json_body(_Config) ->
<<"mountpoint">> := <<"MOUNTPOINT">>, <<"mountpoint">> := <<"MOUNTPOINT">>,
<<"topic">> := <<"t">>, <<"topic">> := <<"t">>,
<<"action">> := <<"publish">>, <<"action">> := <<"publish">>,
<<"access">> := <<"2">>,
<<"qos">> := <<"1">>, <<"qos">> := <<"1">>,
<<"retain">> := <<"false">> <<"retain">> := <<"false">>
}, },
@ -335,6 +341,7 @@ t_json_body(_Config) ->
<<"mountpoint">> => <<"${mountpoint}">>, <<"mountpoint">> => <<"${mountpoint}">>,
<<"topic">> => <<"${topic}">>, <<"topic">> => <<"${topic}">>,
<<"action">> => <<"${action}">>, <<"action">> => <<"${action}">>,
<<"access">> => <<"${access}">>,
<<"qos">> => <<"${qos}">>, <<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">> <<"retain">> => <<"${retain}">>
} }
@ -413,6 +420,7 @@ t_placeholder_and_body(_Config) ->
<<"mountpoint">> := <<"MOUNTPOINT">>, <<"mountpoint">> := <<"MOUNTPOINT">>,
<<"topic">> := <<"t">>, <<"topic">> := <<"t">>,
<<"action">> := <<"publish">>, <<"action">> := <<"publish">>,
<<"access">> := <<"2">>,
<<"CN">> := ?PH_CERT_CN_NAME, <<"CN">> := ?PH_CERT_CN_NAME,
<<"CS">> := ?PH_CERT_SUBJECT <<"CS">> := ?PH_CERT_SUBJECT
}, },
@ -430,6 +438,7 @@ t_placeholder_and_body(_Config) ->
<<"mountpoint">> => <<"${mountpoint}">>, <<"mountpoint">> => <<"${mountpoint}">>,
<<"topic">> => <<"${topic}">>, <<"topic">> => <<"${topic}">>,
<<"action">> => <<"${action}">>, <<"action">> => <<"${action}">>,
<<"access">> => <<"${access}">>,
<<"CN">> => ?PH_CERT_CN_NAME, <<"CN">> => ?PH_CERT_CN_NAME,
<<"CS">> => ?PH_CERT_SUBJECT <<"CS">> => ?PH_CERT_SUBJECT
}, },

View File

@ -606,6 +606,7 @@ pgsql_server() ->
pgsql_config() -> pgsql_config() ->
#{ #{
auto_reconnect => true, auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>, database => <<"mqtt">>,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"public">>,

View File

@ -426,6 +426,7 @@ setup_config(SpecialParams) ->
pgsql_config() -> pgsql_config() ->
#{ #{
auto_reconnect => true, auto_reconnect => true,
disable_prepared_statements => false,
database => <<"mqtt">>, database => <<"mqtt">>,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"public">>,

View File

@ -1151,7 +1151,18 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefin
post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when
ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
-> ->
ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf), case uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf) of
ok ->
ok;
{error, timeout} ->
throw(<<
"Timed out trying to remove action or source. Please try again and,"
" if the error persists, try disabling the connector before retrying."
>>);
{error, not_found} ->
%% Should not happen, unless config is inconsistent.
throw(<<"Referenced connector not found">>)
end,
ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf), ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf),
Bridges = emqx_utils_maps:deep_put( Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf [BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf

View File

@ -879,6 +879,8 @@ handle_disable_enable(ConfRootKey, Id, Enable) ->
?SERVICE_UNAVAILABLE(<<"request timeout">>); ?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, timeout} -> {error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>); ?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} when is_binary(Reason) ->
?BAD_REQUEST(Reason);
{error, Reason} -> {error, Reason} ->
?INTERNAL_ERROR(Reason) ?INTERNAL_ERROR(Reason)
end end

View File

@ -5,7 +5,7 @@
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"}, {snappyer, "1.2.9"},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},

View File

@ -5,7 +5,7 @@
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"}, {snappyer, "1.2.9"},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},

View File

@ -5,7 +5,7 @@
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
{snappyer, "1.2.9"}, {snappyer, "1.2.9"},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_matrix, [ {application, emqx_bridge_matrix, [
{description, "EMQX Enterprise MatrixDB Bridge"}, {description, "EMQX Enterprise MatrixDB Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> matrix. bridge_v1_type_name() -> matrix.
@ -20,3 +21,9 @@ action_type_name() -> matrix.
connector_type_name() -> matrix. connector_type_name() -> matrix.
schema_module() -> emqx_bridge_matrix. schema_module() -> emqx_bridge_matrix.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [ {application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"}, {description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -82,6 +82,7 @@ fields("get_bridge_v2") ->
fields("post_bridge_v2") -> fields("post_bridge_v2") ->
fields("post", pgsql, pgsql_action); fields("post", pgsql, pgsql_action);
fields("config") -> fields("config") ->
%% Bridge v1 config for all postgres-based bridges (pgsql, matrix, timescale)
[ [
{enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, hoconsc:mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{sql, {sql,
@ -95,8 +96,11 @@ fields("config") ->
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)} )}
] ++ emqx_resource_schema:fields("resource_opts") ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_postgresql:fields(config) -- proplists:delete(
emqx_connector_schema_lib:prepare_statement_fields()); disable_prepared_statements,
emqx_postgresql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()
);
fields("post") -> fields("post") ->
fields("post", ?ACTION_TYPE, "config"); fields("post", ?ACTION_TYPE, "config");
fields("put") -> fields("put") ->

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> pgsql. bridge_v1_type_name() -> pgsql.
@ -20,3 +21,20 @@ action_type_name() -> pgsql.
connector_type_name() -> pgsql. connector_type_name() -> pgsql.
schema_module() -> emqx_bridge_pgsql. schema_module() -> emqx_bridge_pgsql.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
),
maps:with(bridge_v1_fields(), Config0).
%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------
bridge_v1_fields() ->
[
emqx_utils_conv:bin(K)
|| {K, _V} <- emqx_bridge_pgsql:fields("config")
].

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, pgsql). -define(BRIDGE_TYPE, pgsql).
-define(BRIDGE_TYPE_BIN, <<"pgsql">>). -define(BRIDGE_TYPE_BIN, <<"pgsql">>).
@ -33,7 +34,18 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). All0 = emqx_common_test_helpers:all(?MODULE),
All = All0 -- matrix_cases(),
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
Groups ++ All.
matrix_cases() ->
[
t_disable_prepared_statements
].
groups() ->
emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
init_per_suite(Config) -> init_per_suite(Config) ->
PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"), PostgresHost = os:getenv("PGSQL_TCP_HOST", "toxiproxy"),
@ -80,10 +92,26 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps), emqx_cth_suite:stop(Apps),
ok. ok.
init_per_testcase(TestCase, Config) -> init_per_group(Group, Config) when
common_init_per_testcase(TestCase, Config). Group =:= postgres;
Group =:= timescale;
Group =:= matrix
->
[
{bridge_type, group_to_type(Group)},
{connector_type, group_to_type(Group)}
| Config
];
init_per_group(_Group, Config) ->
Config.
common_init_per_testcase(TestCase, Config) -> group_to_type(postgres) -> pgsql;
group_to_type(Group) -> Group.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) ->
ct:timetrap(timer:seconds(60)), ct:timetrap(timer:seconds(60)),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_config:delete_override_conf_files(), emqx_config:delete_override_conf_files(),
@ -103,10 +131,10 @@ common_init_per_testcase(TestCase, Config) ->
BridgeConfig = bridge_config(Name, Name), BridgeConfig = bridge_config(Name, Name),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[ [
{connector_type, ?CONNECTOR_TYPE}, {connector_type, proplists:get_value(connector_type, Config, ?CONNECTOR_TYPE)},
{connector_name, Name}, {connector_name, Name},
{connector_config, ConnectorConfig}, {connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE}, {bridge_type, proplists:get_value(bridge_type, Config, ?BRIDGE_TYPE)},
{bridge_name, Name}, {bridge_name, Name},
{bridge_config, BridgeConfig} {bridge_config, BridgeConfig}
| NConfig | NConfig
@ -232,3 +260,20 @@ t_sync_query(Config) ->
t_start_action_or_source_with_disabled_connector(Config) -> t_start_action_or_source_with_disabled_connector(Config) ->
ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config), ok = emqx_bridge_v2_testlib:t_start_action_or_source_with_disabled_connector(Config),
ok. ok.
t_disable_prepared_statements(matrix) ->
[[postgres], [timescale], [matrix]];
t_disable_prepared_statements(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun make_message/0,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
postgres_bridge_connector_on_query_return
),
ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.

View File

@ -128,8 +128,8 @@ on_query(
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result} #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
), ),
Result; Result;
Error -> {error, Reason} ->
Error {error, Reason}
end. end.
on_batch_query( on_batch_query(
@ -165,8 +165,8 @@ on_batch_query(
} }
), ),
Result; Result;
Error -> {error, Reason} ->
Error {error, Reason}
end. end.
trace_format_commands(Commands0) -> trace_format_commands(Commands0) ->
@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) ->
end. end.
proc_command_template(CommandTemplate, Msg) -> proc_command_template(CommandTemplate, Msg) ->
lists:map( lists:reverse(
fun(ArgTks) -> lists:foldl(
emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) fun(ArgTks, Acc) ->
end, New = proc_tmpl(ArgTks, Msg),
CommandTemplate lists:reverse(New, Acc)
end,
[],
CommandTemplate
)
). ).
preproc_command_template(CommandTemplate) -> preproc_command_template(CommandTemplate) ->
@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) ->
fun emqx_placeholder:preproc_tmpl/1, fun emqx_placeholder:preproc_tmpl/1,
CommandTemplate CommandTemplate
). ).
%% This function mimics emqx_placeholder:proc_tmpl/3 but with an
%% injected special handling of map_to_redis_hset_args result
%% which is a list of redis command args (all in binary string format)
proc_tmpl([{var, Phld}], Data) ->
case emqx_placeholder:lookup_var(Phld, Data) of
[map_to_redis_hset_args | L] ->
L;
Other ->
[emqx_utils_conv:bin(Other)]
end;
proc_tmpl(Tokens, Data) ->
%% more than just a var ref, but a string, or a concatenation of string and a var
%% this is must be a single arg, format it into a binary
[emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})].

View File

@ -46,7 +46,8 @@ matrix_testcases() ->
t_start_stop, t_start_stop,
t_create_via_http, t_create_via_http,
t_on_get_status, t_on_get_status,
t_sync_query t_sync_query,
t_map_to_redis_hset_args
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) ->
Path = group_path(Config), Path = group_path(Config),
ct:comment(Path), ct:comment(Path),
ConnectorConfig = connector_config(Name, Path, NConfig), ConnectorConfig = connector_config(Name, Path, NConfig),
BridgeConfig = action_config(Name, Path, Name), BridgeConfig = action_config(Name, Path, Name, TestCase),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[ [
{connector_type, ?CONNECTOR_TYPE}, {connector_type, ?CONNECTOR_TYPE},
@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
ct:pal("parsed config: ~p", [Config]), ct:pal("parsed config: ~p", [Config]),
InnerConfigMap. InnerConfigMap.
action_config(Name, Path, ConnectorId) -> action_config(Name, Path, ConnectorId, TestCase) ->
Template =
try
?MODULE:TestCase(command_template)
catch
_:_ ->
[<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]
end,
[RedisType, _Transport | _] = Path, [RedisType, _Transport | _] = Path,
CommonCfg = CommonCfg =
#{ #{
@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) ->
<<"connector">> => ConnectorId, <<"connector">> => ConnectorId,
<<"parameters">> => <<"parameters">> =>
#{ #{
<<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>], <<"command_template">> => Template,
<<"redis_type">> => atom_to_binary(RedisType) <<"redis_type">> => atom_to_binary(RedisType)
}, },
<<"local_topic">> => <<"t/redis">>, <<"local_topic">> => <<"t/redis">>,
@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) ->
emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap). emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
make_message() -> make_message() ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
make_message_with_payload(Payload).
make_message_with_payload(Payload) ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
#{ #{
clientid => ClientId, clientid => ClientId,
payload => Payload, payload => Payload,
@ -290,7 +301,7 @@ t_start_stop(matrix) ->
[sentinel, tcp], [sentinel, tcp],
[cluster, tcp] [cluster, tcp]
]}; ]};
t_start_stop(Config) -> t_start_stop(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped), emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
ok. ok.
@ -300,7 +311,7 @@ t_create_via_http(matrix) ->
[sentinel, tcp], [sentinel, tcp],
[cluster, tcp] [cluster, tcp]
]}; ]};
t_create_via_http(Config) -> t_create_via_http(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config), emqx_bridge_v2_testlib:t_create_via_http(Config),
ok. ok.
@ -310,7 +321,7 @@ t_on_get_status(matrix) ->
[sentinel, tcp], [sentinel, tcp],
[cluster, tcp] [cluster, tcp]
]}; ]};
t_on_get_status(Config) -> t_on_get_status(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok. ok.
@ -320,7 +331,7 @@ t_sync_query(matrix) ->
[sentinel, tcp], [sentinel, tcp],
[cluster, tcp] [cluster, tcp]
]}; ]};
t_sync_query(Config) -> t_sync_query(Config) when is_list(Config) ->
ok = emqx_bridge_v2_testlib:t_sync_query( ok = emqx_bridge_v2_testlib:t_sync_query(
Config, Config,
fun make_message/0, fun make_message/0,
@ -328,3 +339,22 @@ t_sync_query(Config) ->
redis_bridge_connector_send_done redis_bridge_connector_send_done
), ),
ok. ok.
t_map_to_redis_hset_args(matrix) ->
{map_to_redis_hset_args, [
[single, tcp],
[sentinel, tcp],
[cluster, tcp]
]};
t_map_to_redis_hset_args(command_template) ->
[<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>];
t_map_to_redis_hset_args(Config) when is_list(Config) ->
Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}),
MsgFn = fun() -> make_message_with_payload(Payload) end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
MsgFn,
fun(Res) -> ?assertMatch({ok, _}, Res) end,
redis_bridge_connector_send_done
),
ok.

View File

@ -78,7 +78,7 @@
%% https://www.erlang.org/doc/man/odbc.html %% https://www.erlang.org/doc/man/odbc.html
%% as returned by connect/2 %% as returned by connect/2
-type connection_reference() :: pid(). -type connection_reference() :: odbc:connection_reference().
-type time_out() :: milliseconds() | infinity. -type time_out() :: milliseconds() | infinity.
-type sql() :: string() | binary(). -type sql() :: string() | binary().
-type milliseconds() :: pos_integer(). -type milliseconds() :: pos_integer().

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_timescale, [ {application, emqx_bridge_timescale, [
{description, "EMQX Enterprise TimescaleDB Bridge"}, {description, "EMQX Enterprise TimescaleDB Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource]}, {applications, [kernel, stdlib, emqx_resource]},
{env, [ {env, [

View File

@ -10,7 +10,8 @@
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0 schema_module/0,
connector_action_config_to_bridge_v1_config/2
]). ]).
bridge_v1_type_name() -> timescale. bridge_v1_type_name() -> timescale.
@ -20,3 +21,9 @@ action_type_name() -> timescale.
connector_type_name() -> timescale. connector_type_name() -> timescale.
schema_module() -> emqx_bridge_timescale. schema_module() -> emqx_bridge_timescale.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_pgsql_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig,
ActionConfig
).

View File

@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) ->
OkResult; OkResult;
is_ok(Error = {error, _}) -> is_ok(Error = {error, _}) ->
Error; Error;
is_ok(timeout) ->
%% Returned by `emqx_resource_manager:start' when the connector fails to reach either
%% `?status_connected' or `?status_disconnected' within `start_timeout'.
timeout;
is_ok(ResL) -> is_ok(ResL) ->
case case
lists:filter( lists:filter(
@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
?NO_CONTENT; ?NO_CONTENT;
timeout ->
%% Returned by `emqx_resource_manager:start' when the connector fails to reach
%% either `?status_connected' or `?status_disconnected' within
%% `start_timeout'.
?BAD_REQUEST(<<
"Timeout while waiting for connector to reach connected status."
" Please try again."
>>);
{error, not_implemented} -> {error, not_implemented} ->
?NOT_IMPLEMENTED; ?NOT_IMPLEMENTED;
{error, timeout} -> {error, timeout} ->

View File

@ -536,14 +536,20 @@ do_start_connector(TestType, Config) ->
request_json( request_json(
post, post,
uri(["connectors"]), uri(["connectors"]),
?KAFKA_CONNECTOR(BadName, BadServer), (?KAFKA_CONNECTOR(BadName, BadServer))#{
<<"resource_opts">> => #{
<<"start_timeout">> => <<"10ms">>
}
},
Config Config
) )
), ),
BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName), BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName),
%% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to
%% be connected doesn't return a 500 error.
?assertMatch( ?assertMatch(
%% request from product: return 400 on such errors %% request from product: return 400 on such errors
{ok, SC, _} when SC == 500 orelse SC == 400, {ok, 400, _},
request(post, {operation, TestType, start, BadConnectorID}, Config) request(post, {operation, TestType, start, BadConnectorID}, Config)
), ),
ok = gen_tcp:close(Sock), ok = gen_tcp:close(Sock),

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugins, [ {application, emqx_plugins, [
{description, "EMQX Plugin Management"}, {description, "EMQX Plugin Management"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{modules, []}, {modules, []},
{mod, {emqx_plugins_app, []}}, {mod, {emqx_plugins_app, []}},
{applications, [kernel, stdlib, emqx, erlavro]}, {applications, [kernel, stdlib, emqx, erlavro]},

View File

@ -136,6 +136,19 @@ parse_name_vsn(NameVsn) when is_list(NameVsn) ->
make_name_vsn_string(Name, Vsn) -> make_name_vsn_string(Name, Vsn) ->
binary_to_list(iolist_to_binary([Name, "-", Vsn])). binary_to_list(iolist_to_binary([Name, "-", Vsn])).
app_dir(AppName, Apps) ->
case
lists:filter(
fun(AppNameVsn) -> nomatch =/= string:prefix(AppNameVsn, AppName) end,
Apps
)
of
[AppNameVsn] ->
{ok, AppNameVsn};
_ ->
{error, not_found}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Package operations %% Package operations
@ -1372,12 +1385,14 @@ plugin_dir(NameVsn) ->
-spec plugin_priv_dir(name_vsn()) -> string(). -spec plugin_priv_dir(name_vsn()) -> string().
plugin_priv_dir(NameVsn) -> plugin_priv_dir(NameVsn) ->
case read_plugin_info(NameVsn, #{fill_readme => false}) of maybe
{ok, #{<<"name">> := Name, <<"metadata_vsn">> := Vsn}} -> {ok, #{<<"name">> := Name, <<"rel_apps">> := Apps}} ?=
AppDir = make_name_vsn_string(Name, Vsn), read_plugin_info(NameVsn, #{fill_readme => false}),
wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"])); {ok, AppDir} ?= app_dir(Name, Apps),
_ -> wrap_to_list(filename:join([plugin_dir(NameVsn), AppDir, "priv"]))
wrap_to_list(filename:join([install_dir(), NameVsn, "priv"])) else
%% Otherwise assume the priv directory is under the plugin root directory
_ -> wrap_to_list(filename:join([install_dir(), NameVsn, "priv"]))
end. end.
-spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}. -spec plugin_config_dir(name_vsn()) -> string() | {error, Reason :: string()}.

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.2.0"}, {vsn, "0.2.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -50,6 +50,8 @@
execute_batch/3 execute_batch/3
]). ]).
-export([disable_prepared_statements/0]).
%% for ecpool workers usage %% for ecpool workers usage
-export([do_get_status/1, prepare_sql_to_conn/2]). -export([do_get_status/1, prepare_sql_to_conn/2]).
@ -62,7 +64,7 @@
#{ #{
pool_name := binary(), pool_name := binary(),
query_templates := #{binary() => template()}, query_templates := #{binary() => template()},
prepares := #{binary() => epgsql:statement()} | {error, _} prepares := disabled | #{binary() => epgsql:statement()} | {error, _}
}. }.
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch' %% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
@ -78,7 +80,10 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}]. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) -> fields(config) ->
[{server, server()}] ++ [
{server, server()},
disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:ssl_fields() ++
emqx_connector_schema_lib:prepare_statement_fields(). emqx_connector_schema_lib:prepare_statement_fields().
@ -87,6 +92,17 @@ server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?PGSQL_HOST_OPTIONS).
disable_prepared_statements() ->
{disable_prepared_statements,
hoconsc:mk(
boolean(),
#{
default => false,
required => false,
desc => ?DESC("disable_prepared_statements")
}
)}.
adjust_fields(Fields) -> adjust_fields(Fields) ->
lists:map( lists:map(
fun fun
@ -108,6 +124,7 @@ on_start(
InstId, InstId,
#{ #{
server := Server, server := Server,
disable_prepared_statements := DisablePreparedStatements,
database := DB, database := DB,
username := User, username := User,
pool_size := PoolSize, pool_size := PoolSize,
@ -143,11 +160,16 @@ on_start(
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
State1 = parse_prepare_sql(Config, <<"send_message">>), State1 = parse_sql_template(Config, <<"send_message">>),
State2 = State1#{installed_channels => #{}}, State2 = State1#{installed_channels => #{}},
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State2#{pool_name => InstId, prepares => #{}})}; Prepares =
case DisablePreparedStatements of
true -> disabled;
false -> #{}
end,
{ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
pgsql_connector_start_failed, pgsql_connector_start_failed,
@ -209,13 +231,17 @@ on_add_channel(
create_channel_state( create_channel_state(
ChannelId, ChannelId,
#{pool_name := PoolName} = _ConnectorState, #{
pool_name := PoolName,
prepares := Prepares
} = _ConnectorState,
#{parameters := Parameters} = _ChannelConfig #{parameters := Parameters} = _ChannelConfig
) -> ) ->
State1 = parse_prepare_sql(Parameters, ChannelId), State1 = parse_sql_template(Parameters, ChannelId),
{ok, {ok,
init_prepare(State1#{ init_prepare(State1#{
pool_name => PoolName, pool_name => PoolName,
prepares => Prepares,
prepare_statement => #{} prepare_statement => #{}
})}. })}.
@ -233,6 +259,8 @@ on_remove_channel(
NewState = OldState#{installed_channels => NewInstalledChannels}, NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}. {ok, NewState}.
close_prepared_statement(_ChannelId, #{prepares := disabled}) ->
ok;
close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) -> close_prepared_statement(ChannelId, #{pool_name := PoolName} = State) ->
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
close_prepared_statement(WorkerPids, ChannelId, State), close_prepared_statement(WorkerPids, ChannelId, State),
@ -243,7 +271,7 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
%% prepared statement doesn't exist. %% prepared statement doesn't exist.
try ecpool_worker:client(WorkerPid) of try ecpool_worker:client(WorkerPid) of
{ok, Conn} -> {ok, Conn} ->
Statement = get_prepared_statement(ChannelId, State), Statement = get_templated_statement(ChannelId, State),
_ = epgsql:close(Conn, Statement), _ = epgsql:close(Conn, Statement),
close_prepared_statement(Rest, ChannelId, State); close_prepared_statement(Rest, ChannelId, State);
_ -> _ ->
@ -303,21 +331,23 @@ on_query(
sql => NameOrSQL, sql => NameOrSQL,
state => State state => State
}), }),
Type = pgsql_query_type(TypeOrKey), Type = pgsql_query_type(TypeOrKey, State),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
handle_result(Res). handle_result(Res).
pgsql_query_type(sql) -> pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
query; query;
pgsql_query_type(query) -> pgsql_query_type(sql, _ConnectorState) ->
query; query;
pgsql_query_type(prepared_query) -> pgsql_query_type(query, _ConnectorState) ->
query;
pgsql_query_type(prepared_query, _ConnectorState) ->
prepared_query; prepared_query;
%% for bridge %% for bridge
pgsql_query_type(_) -> pgsql_query_type(_, ConnectorState) ->
pgsql_query_type(prepared_query). pgsql_query_type(prepared_query, ConnectorState).
on_batch_query( on_batch_query(
InstId, InstId,
@ -336,9 +366,9 @@ on_batch_query(
?SLOG(error, Log), ?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}}; {error, {unrecoverable_error, batch_prepare_not_implemented}};
{_Statement, RowTemplate} -> {_Statement, RowTemplate} ->
PrepStatement = get_prepared_statement(BinKey, State), StatementTemplate = get_templated_statement(BinKey, State),
Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
{error, _Error} = Result -> {error, _Error} = Result ->
handle_result(Result); handle_result(Result);
{_Column, Results} -> {_Column, Results} ->
@ -359,12 +389,19 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
BinKey = to_bin(TypeOrKey), BinKey = to_bin(TypeOrKey),
case get_template(BinKey, State) of case get_template(BinKey, State) of
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
{_Statement, RowTemplate} -> {Statement, RowTemplate} ->
{BinKey, render_prepare_sql_row(RowTemplate, SQLOrData)} Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
case DisablePreparedStatements of
true ->
{Statement, Rendered};
false ->
{BinKey, Rendered}
end
end. end.
get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
@ -376,14 +413,14 @@ get_template(Key, #{query_templates := Templates}) ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
maps:get(BinKey, Templates, undefined). maps:get(BinKey, Templates, undefined).
get_prepared_statement(Key, #{installed_channels := Channels} = _State) when get_templated_statement(Key, #{installed_channels := Channels} = _State) when
is_map_key(Key, Channels) is_map_key(Key, Channels)
-> ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
ChannelState = maps:get(BinKey, Channels), ChannelState = maps:get(BinKey, Channels),
ChannelPreparedStatements = maps:get(prepares, ChannelState), ChannelPreparedStatements = maps:get(prepares, ChannelState),
maps:get(BinKey, ChannelPreparedStatements); maps:get(BinKey, ChannelPreparedStatements);
get_prepared_statement(Key, #{prepares := PrepStatements}) -> get_templated_statement(Key, #{prepares := PrepStatements}) ->
BinKey = to_bin(Key), BinKey = to_bin(Key),
maps:get(BinKey, PrepStatements). maps:get(BinKey, PrepStatements).
@ -480,6 +517,8 @@ do_check_prepares(
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end; end;
do_check_prepares(#{prepares := disabled}) ->
ok;
do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) -> do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
ok; ok;
do_check_prepares(#{prepares := {error, _}} = State) -> do_check_prepares(#{prepares := {error, _}} = State) ->
@ -579,7 +618,7 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
conn_opts([_Opt | Opts], Acc) -> conn_opts([_Opt | Opts], Acc) ->
conn_opts(Opts, Acc). conn_opts(Opts, Acc).
parse_prepare_sql(Config, SQLID) -> parse_sql_template(Config, SQLID) ->
Queries = Queries =
case Config of case Config of
#{prepare_statement := Qs} -> #{prepare_statement := Qs} ->
@ -589,10 +628,10 @@ parse_prepare_sql(Config, SQLID) ->
#{} -> #{} ->
#{} #{}
end, end,
Templates = maps:fold(fun parse_prepare_sql/3, #{}, Queries), Templates = maps:fold(fun parse_sql_template/3, #{}, Queries),
#{query_templates => Templates}. #{query_templates => Templates}.
parse_prepare_sql(Key, Query, Acc) -> parse_sql_template(Key, Query, Acc) ->
Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}), Template = emqx_template_sql:parse_prepstmt(Query, #{parameters => '$n'}),
Acc#{Key => Template}. Acc#{Key => Template}.
@ -601,6 +640,8 @@ render_prepare_sql_row(RowTemplate, Data) ->
{Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}), {Row, _Errors} = emqx_template_sql:render_prepstmt(RowTemplate, {emqx_jsonish, Data}),
Row. Row.
init_prepare(State = #{prepares := disabled}) ->
State;
init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 -> init_prepare(State = #{query_templates := Templates}) when map_size(Templates) == 0 ->
State; State;
init_prepare(State = #{}) -> init_prepare(State = #{}) ->

View File

@ -47,7 +47,10 @@ roots() ->
[]. [].
fields("connection_fields") -> fields("connection_fields") ->
[{server, server()}] ++ [
{server, server()},
emqx_postgresql:disable_prepared_statements()
] ++
adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++ adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields("config_connector") -> fields("config_connector") ->

View File

@ -85,6 +85,7 @@
get_allocated_resources_list/1, get_allocated_resources_list/1,
forget_allocated_resources/1, forget_allocated_resources/1,
deallocate_resource/2, deallocate_resource/2,
clean_allocated_resources/2,
%% Get channel config from resource %% Get channel config from resource
call_get_channel_config/3, call_get_channel_config/3,
% Call the format query result function % Call the format query result function

View File

@ -63,6 +63,10 @@
%% Internal exports. %% Internal exports.
-export([worker_resource_health_check/1, worker_channel_health_check/2]). -export([worker_resource_health_check/1, worker_channel_health_check/2]).
-ifdef(TEST).
-export([stop/2]).
-endif.
% State record % State record
-record(data, { -record(data, {
id, id,
@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) ->
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) -> remove(ResId, ClearMetrics) when is_binary(ResId) ->
try try
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
{error, timeout} ->
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
action => remove,
resource_id => ResId
}),
force_kill(ResId),
ok;
Res ->
Res
end
after after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition. %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
@ -274,7 +288,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
end. end.
%% @doc Start the resource %% @doc Start the resource
-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. -spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}.
start(ResId, Opts) -> start(ResId, Opts) ->
StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION), StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION),
case safe_call(ResId, start, StartTimeout) of case safe_call(ResId, start, StartTimeout) of
@ -287,9 +301,20 @@ start(ResId, Opts) ->
%% @doc Stop the resource %% @doc Stop the resource
-spec stop(resource_id()) -> ok | {error, Reason :: term()}. -spec stop(resource_id()) -> ok | {error, Reason :: term()}.
stop(ResId) -> stop(ResId) ->
case safe_call(ResId, stop, ?T_OPERATION) of stop(ResId, ?T_OPERATION).
-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}.
stop(ResId, Timeout) ->
case safe_call(ResId, stop, Timeout) of
ok -> ok ->
ok; ok;
{error, timeout} ->
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
action => stop,
resource_id => ResId
}),
force_kill(ResId),
ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
end. end.
@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
get_error(_ResId, #{error := Error}) -> get_error(_ResId, #{error := Error}) ->
Error. Error.
force_kill(ResId) ->
case gproc:whereis_name(?NAME(ResId)) of
undefined ->
ok;
Pid when is_pid(Pid) ->
exit(Pid, kill),
try_clean_allocated_resources(ResId),
ok
end.
try_clean_allocated_resources(ResId) ->
case try_read_cache(ResId) of
#data{mod = Mod} ->
catch emqx_resource:clean_allocated_resources(ResId, Mod),
ok;
_ ->
ok
end.
%% Server start/stop callbacks %% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server %% @doc Function called from the supervisor to actually start the server
@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
Data. Data.
stop_resource(#data{state = ResState, id = ResId} = Data) -> stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% We don't care the return value of the Mod:on_stop/2. %% We don't care about the return value of `Mod:on_stop/2'.
%% The callback mod should make sure the resource is stopped after on_stop/2 %% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned. %% is returned.
HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),

View File

@ -71,6 +71,16 @@ set_callback_mode(Mode) ->
on_start(_InstId, #{create_error := true}) -> on_start(_InstId, #{create_error := true}) ->
?tp(connector_demo_start_error, #{}), ?tp(connector_demo_start_error, #{}),
error("some error"); error("some error");
on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
?tp(connector_demo_start_delay, #{}),
case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
not_called ->
emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
timer:sleep(Delay),
on_start(InstId, maps:remove(create_error, Opts));
called ->
on_start(InstId, maps:remove(create_error, Opts))
end;
on_start(InstId, #{name := Name} = Opts) -> on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false), Register = maps:get(register, Opts, false),
StopError = maps:get(stop_error, Opts, false), StopError = maps:get(stop_error, Opts, false),
@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) ->
pid => spawn_counter_process(Name, Register) pid => spawn_counter_process(Name, Register)
}}. }}.
on_stop(_InstId, undefined) ->
?tp(connector_demo_free_resources_without_state, #{}),
ok;
on_stop(_InstId, #{stop_error := true}) -> on_stop(_InstId, #{stop_error := true}) ->
{error, stop_error}; {error, stop_error};
on_stop(InstId, #{pid := Pid}) -> on_stop(InstId, #{pid := Pid}) ->

View File

@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) ->
), ),
ok. ok.
%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync
%% call such as `on_start', and that the claimed resources, if any, are freed.
t_force_stop(_Config) ->
?check_trace(
begin
{ok, Agent} = emqx_utils_agent:start_link(not_called),
{ok, _} =
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{
name => test_resource,
create_error => {delay, 30_000, Agent}
},
#{
health_check_interval => 100,
start_timeout => 100
}
),
?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)),
ok
end,
[
log_consistency_prop(),
fun(Trace) ->
?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)),
?assertMatch(
[_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace)
),
?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)),
ok
end
]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -733,8 +733,8 @@ event_info() ->
event_info_schema_validation_failed() -> event_info_schema_validation_failed() ->
event_info_common( event_info_common(
'schema.validation_failed', 'schema.validation_failed',
{<<"schema validation failed">>, <<"TODO"/utf8>>}, {<<"schema validation failed">>, <<"schema 验证失败"/utf8>>},
{<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, {<<"messages that do not pass configured validations">>, <<"未通过验证的消息"/utf8>>},
<<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">> <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
). ).
ee_event_info() -> ee_event_info() ->

View File

@ -160,6 +160,7 @@
find/3, find/3,
join_to_string/1, join_to_string/1,
join_to_string/2, join_to_string/2,
map_to_redis_hset_args/1,
join_to_sql_values_string/1, join_to_sql_values_string/1,
jq/2, jq/2,
jq/3, jq/3,
@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str).
join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List). join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List).
%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields.
%% Notes:
%% - Non-string keys in the input map are dropped
%% - Keys are not quoted
%% - String values are always quoted
%% - No escape sequence for keys and values
%% - Float point values are formatted with fixed (6) decimal point compact-formatting
map_to_redis_hset_args(Map) when erlang:is_map(Map) ->
[map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)].
redis_hset_acc(K, V, IoData) ->
try
[redis_field_name(K), redis_field_value(V) | IoData]
catch
_:_ ->
IoData
end.
redis_field_name(K) when erlang:is_binary(K) ->
K;
redis_field_name(K) ->
throw({bad_redis_field_name, K}).
redis_field_value(V) when erlang:is_binary(V) ->
V;
redis_field_value(V) when erlang:is_integer(V) ->
integer_to_binary(V);
redis_field_value(V) when erlang:is_float(V) ->
float2str(V, 6);
redis_field_value(V) when erlang:is_boolean(V) ->
atom_to_binary(V).
join_to_sql_values_string(List) -> join_to_sql_values_string(List) ->
QuotedList = QuotedList =
[ [

View File

@ -1376,6 +1376,27 @@ t_parse_date_errors(_) ->
ok. ok.
t_map_to_redis_hset_args(_Config) ->
Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end,
?assertEqual([], Do(#{})),
?assertEqual([], Do(#{1 => 2})),
?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})),
?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})),
?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})),
?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})),
?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})),
?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})),
%% no determined ordering
?assert(
case Do(#{<<"a">> => 1, <<"b">> => 2}) of
[<<"a">>, <<"1">>, <<"b">>, <<"2">>] ->
true;
[<<"b">>, <<"2">>, <<"a">>, <<"1">>] ->
true
end
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Utility functions %% Utility functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -37,7 +37,8 @@
proc_tmpl_deep/3, proc_tmpl_deep/3,
bin/1, bin/1,
sql_data/1 sql_data/1,
lookup_var/2
]). ]).
-export([ -export([

View File

@ -0,0 +1,66 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html).
-module(emqx_utils_agent).
%% API
-export([start_link/1, get/1, get_and_update/2]).
%% `gen_server' API
-export([init/1, handle_call/3]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-type state() :: term().
-type get_and_update_fn() :: fun((state()) -> {term(), state()}).
-record(get_and_update, {fn :: get_and_update_fn()}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec start_link(state()) -> gen_server:start_ret().
start_link(InitState) ->
gen_server:start_link(?MODULE, InitState, []).
-spec get(gen_server:server_ref()) -> term().
get(ServerRef) ->
Fn = fun(St) -> {St, St} end,
gen_server:call(ServerRef, #get_and_update{fn = Fn}).
-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term().
get_and_update(ServerRef, Fn) ->
gen_server:call(ServerRef, #get_and_update{fn = Fn}).
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
init(InitState) ->
{ok, InitState}.
handle_call(#get_and_update{fn = Fn}, _From, State0) ->
{Reply, State} = Fn(State0),
{reply, Reply, State}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -1204,7 +1204,7 @@ case "${COMMAND}" in
esac esac
case "$COMMAND" in case "$COMMAND" in
foreground) foreground)
FOREGROUNDOPTIONS="-enable-feature maybe_expr -noshell -noinput +Bd" FOREGROUNDOPTIONS="-enable-feature maybe_expr -noinput -noshell +Bd"
;; ;;
*) *)
FOREGROUNDOPTIONS='-enable-feature maybe_expr' FOREGROUNDOPTIONS='-enable-feature maybe_expr'

12
build
View File

@ -397,11 +397,11 @@ function is_ecr_and_enterprise() {
## Build the default docker image based on debian 12. ## Build the default docker image based on debian 12.
make_docker() { make_docker() {
local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-5}" local EMQX_BUILDER_VERSION="${EMQX_BUILDER_VERSION:-5.3-7}"
local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}" local EMQX_BUILDER_PLATFORM="${EMQX_BUILDER_PLATFORM:-debian12}"
local EMQX_BUILDER_OTP="${EMQX_BUILDER_OTP:-25.3.2-2}" local OTP_VSN="${OTP_VSN:-26.2.5-1}"
local EMQX_BUILDER_ELIXIR="${EMQX_BUILDER_ELIXIR:-1.15.7}" local ELIXIR_VSN="${ELIXIR_VSN:-1.15.7}"
local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} local EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}}
local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}" local EMQX_RUNNER="${EMQX_RUNNER:-${EMQX_DEFAULT_RUNNER}}"
local EMQX_DOCKERFILE="${EMQX_DOCKERFILE:-deploy/docker/Dockerfile}" local EMQX_DOCKERFILE="${EMQX_DOCKERFILE:-deploy/docker/Dockerfile}"
local EMQX_SOURCE_TYPE="${EMQX_SOURCE_TYPE:-src}" local EMQX_SOURCE_TYPE="${EMQX_SOURCE_TYPE:-src}"
@ -465,7 +465,7 @@ make_docker() {
--label org.opencontainers.image.description="${PRODUCT_DESCRIPTION}" \ --label org.opencontainers.image.description="${PRODUCT_DESCRIPTION}" \
--label org.opencontainers.image.documentation="${DOCUMENTATION_URL}" \ --label org.opencontainers.image.documentation="${DOCUMENTATION_URL}" \
--label org.opencontainers.image.licenses="${LICENSE}" \ --label org.opencontainers.image.licenses="${LICENSE}" \
--label org.opencontainers.image.otp.version="${EMQX_BUILDER_OTP}" \ --label org.opencontainers.image.otp.version="${OTP_VSN}" \
--pull --pull
) )
:> ./.emqx_docker_image_tags :> ./.emqx_docker_image_tags
@ -477,7 +477,7 @@ make_docker() {
DOCKER_BUILDX_ARGS+=(--no-cache) DOCKER_BUILDX_ARGS+=(--no-cache)
fi fi
if [ "${SUFFIX}" = '-elixir' ]; then if [ "${SUFFIX}" = '-elixir' ]; then
DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${EMQX_BUILDER_ELIXIR}") DOCKER_BUILDX_ARGS+=(--label org.opencontainers.image.elixir.version="${ELIXIR_VSN}")
fi fi
if [ "${DOCKER_LATEST:-false}" = true ]; then if [ "${DOCKER_LATEST:-false}" = true ]; then
for r in "${DOCKER_REGISTRIES[@]}"; do for r in "${DOCKER_REGISTRIES[@]}"; do

View File

@ -0,0 +1,3 @@
Added the `disable_prepared_statements` option for Postgres-based connectors.
This option is to be used with endpoints that do not support the prepared statements session feature, such as PGBouncer and Supabase in Transaction mode.

View File

@ -0,0 +1 @@
Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected.

View File

@ -0,0 +1,6 @@
Fix HTTP authorization request body encoding.
Prior to this fix, the HTTP authorization request body encoding format was taken from the `accept` header.
The fix is to respect the `content-type` header instead.
Also added `access` templating variable for v4 compatibility.
The access code of SUBSCRIBE action is `1` and SUBSCRIBE action is `2`.

View File

@ -0,0 +1,3 @@
Now, when attempting to stop a connector, if such operation times out, we forcefully shut down the connector process.
Error messages when attempting to disable an action/source when its underlying connector is stuck were also improved.

View File

@ -0,0 +1,5 @@
Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values.
For example, if `payload.value` is a map of multiple data fields,
this rule `SELECT map_to_redis_hset_args(payload.value) as hset_fields FROM "t/#"` can prepare `hset_fields`
for redis action to render the command template like `HMSET name1 ${hset_fields}`.

View File

@ -0,0 +1,3 @@
Improve Kafka consumer group stability.
Prior to this change, Kafka consumer group sometimes may need to rebalance twice after Kafka group coordinator restart.

View File

@ -1,4 +1,4 @@
ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim ARG RUN_FROM=public.ecr.aws/debian/debian:12-slim
ARG SOURCE_TYPE=src # tgz ARG SOURCE_TYPE=src # tgz

View File

@ -102,8 +102,7 @@ defmodule EMQXUmbrella.MixProject do
{:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true}, {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
{:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true}, {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
{:ra, "2.7.3", override: true}, {:ra, "2.7.3", override: true},
{:mimerl, "1.2.0", override: true}, {:mimerl, "1.2.0", override: true}
{:supervisor3, "1.1.12", override: true}
] ++ ] ++
emqx_apps(profile_info, version) ++ emqx_apps(profile_info, version) ++
enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep() enterprise_deps(profile_info) ++ jq_dep() ++ quicer_dep()
@ -215,7 +214,7 @@ defmodule EMQXUmbrella.MixProject do
{:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},
{:snappyer, "1.2.9", override: true}, {:snappyer, "1.2.9", override: true},
{:crc32cer, "0.1.8", override: true}, {:crc32cer, "0.1.8", override: true},
{:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true}, {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},

View File

@ -14,4 +14,13 @@ config_connector.desc:
config_connector.label: config_connector.label:
"""PostgreSQL Connector Config""" """PostgreSQL Connector Config"""
disable_prepared_statements.label:
"""Disable Prepared Statements"""
disable_prepared_statements.desc:
"""~
Disables the usage of prepared statements in the connections.
Some endpoints, like PGBouncer or Supabase in Transaction mode, do not
support session features such as prepared statements. For such connections,
this option should be enabled.~"""
} }

View File

@ -9,7 +9,7 @@
## example: ## example:
## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \ ## ./scripts/buildx.sh --profile emqx --pkgtype tgz --arch arm64 \
## --builder ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12 ## --builder ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12
set -euo pipefail set -euo pipefail
@ -24,7 +24,7 @@ help() {
echo "--arch amd64|arm64: Target arch to build the EMQX package for" echo "--arch amd64|arm64: Target arch to build the EMQX package for"
echo "--src_dir <SRC_DIR>: EMQX source code in this dir, default to PWD" echo "--src_dir <SRC_DIR>: EMQX source code in this dir, default to PWD"
echo "--builder <BUILDER>: Builder image to pull" echo "--builder <BUILDER>: Builder image to pull"
echo " E.g. ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-debian12" echo " E.g. ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-debian12"
} }
die() { die() {

View File

@ -12,11 +12,11 @@ if ! type "yq" > /dev/null; then
exit 1 exit 1
fi fi
EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-5} EMQX_BUILDER_VERSION=${EMQX_BUILDER_VERSION:-5.3-7}
EMQX_BUILDER_OTP=${EMQX_BUILDER_OTP:-26.2.1-2} OTP_VSN=${OTP_VSN:-26.2.5-1}
EMQX_BUILDER_ELIXIR=${EMQX_BUILDER_ELIXIR:-1.15.7} ELIXIR_VSN=${ELIXIR_VSN:-1.15.7}
EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04} EMQX_BUILDER_PLATFORM=${EMQX_BUILDER_PLATFORM:-ubuntu22.04}
EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${EMQX_BUILDER_ELIXIR}-${EMQX_BUILDER_OTP}-${EMQX_BUILDER_PLATFORM}} EMQX_BUILDER=${EMQX_BUILDER:-ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VERSION}:${ELIXIR_VSN}-${OTP_VSN}-${EMQX_BUILDER_PLATFORM}}
commands=$(yq ".jobs.sanity-checks.steps[].run" .github/workflows/_pr_entrypoint.yaml | grep -v null) commands=$(yq ".jobs.sanity-checks.steps[].run" .github/workflows/_pr_entrypoint.yaml | grep -v null)

View File

@ -22,7 +22,7 @@ WEBHOOK="webhook.$NET"
BENCH="bench.$NET" BENCH="bench.$NET"
COOKIE='this-is-a-secret' COOKIE='this-is-a-secret'
## Erlang image is needed to run webhook server and emqtt-bench ## Erlang image is needed to run webhook server and emqtt-bench
ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-5:1.15.7-26.2.1-2-ubuntu22.04" ERLANG_IMAGE="ghcr.io/emqx/emqx-builder/5.3-7:1.15.7-26.2.5-1-ubuntu22.04"
# builder has emqtt-bench installed # builder has emqtt-bench installed
BENCH_IMAGE="$ERLANG_IMAGE" BENCH_IMAGE="$ERLANG_IMAGE"

View File

@ -49,6 +49,7 @@ NIF
OCSP OCSP
OTP OTP
PEM PEM
PGBouncer
PINGREQ PINGREQ
PSK PSK
PSK PSK
@ -65,6 +66,7 @@ Riak
SHA SHA
SMS SMS
Struct Struct
Supabase
TCP TCP
TLS TLS
TTL TTL