From 7a154e4d5837c8d082bb95c92db3be89a001258d Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 1 Mar 2023 16:57:42 +0100 Subject: [PATCH 01/40] chore: docker-latest tag can be create automatically in cut.sh --- .../build_and_push_docker_images.yaml | 36 ++++++------- scripts/rel/cut.sh | 51 ++++++++++++++----- scripts/rel/sync-remotes.sh | 17 ++++++- 3 files changed, 68 insertions(+), 36 deletions(-) diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index c612d2d5f..a5f1f315c 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -43,9 +43,10 @@ jobs: - name: Get profiles to build id: get_profile + env: + INPUTS_PROFILE: ${{ github.event.inputs.profile }} run: | cd source - tag=${{ github.ref }} # tag docker-latest-ce or docker-latest-ee if git describe --tags --exact --match 'docker-latest-*' 2>/dev/null; then echo 'is_latest=true due to docker-latest-* tag' @@ -57,38 +58,33 @@ jobs: echo 'is_latest=false' is_latest=false fi - if git describe --tags --match "[v|e]*" --exact; then + # resolve profile + if git describe --tags --match "v*" --exact; then echo "This is an exact git tag, will publish images" is_exact='true' + PROFILE=emqx + elif git describe --tags --match "e*" --exact; then + echo "This is an exact git tag, will publish images" + is_exact='true' + PROFILE=emqx-enterprise else echo "This is NOT an exact git tag, will not publish images" is_exact='false' fi - case $tag in - refs/tags/v*) - PROFILE='emqx' + + case "${PROFILE:-$INPUTS_PROFILE}" in + emqx) EDITION='Opensource' ;; - refs/tags/e*) - PROFILE=emqx-enterprise + emqx-enterprise) EDITION='Enterprise' ;; *) - PROFILE=${{ github.event.inputs.profile }} - case "$PROFILE" in - emqx) - EDITION='Opensource' - ;; - emqx-enterprise) - EDITION='Enterprise' - ;; - *) - echo "ERROR: Failed to resolve build profile" - exit 1 - ;; - esac + echo "ERROR: Failed to resolve build profile" + exit 1 ;; esac + VSN="$(./pkg-vsn.sh "$PROFILE")" echo "Building emqx/$PROFILE:$VSN image (latest=$is_latest)" echo "Push = $is_exact" diff --git a/scripts/rel/cut.sh b/scripts/rel/cut.sh index 60fe2f28d..ab63d46fe 100755 --- a/scripts/rel/cut.sh +++ b/scripts/rel/cut.sh @@ -19,15 +19,23 @@ RELEASE_GIT_TAG is a 'v*' or 'e*' tag for example: e5.0.0-beta.6 options: - -h|--help: Print this usage. - -b|--base: Specify the current release base branch, can be one of - release-50 - NOTE: this option should be used when --dryrun. - --dryrun: Do not actually create the git tag. - --skip-appup: Skip checking appup - Useful when you are sure that appup is already updated' - --prev-tag: Provide the prev tag to automatically generate changelogs - If this option is absent, the tag found by git describe will be used + -h|--help: Print this usage. + + -b|--base: Specify the current release base branch, can be one of + release-50 + NOTE: this option should be used when --dryrun. + + --dryrun: Do not actually create the git tag. + + --skip-appup: Skip checking appup + Useful when you are sure that appup is already updated' + + --prev-tag : Provide the prev tag to automatically generate changelogs + If this option is absent, the tag found by git describe will be used + + --docker-latest: Set this option to assign :latest tag on the corresponding docker image + in addition to regular : one + NOTE: For 5.0 series the current working branch must be 'release-50' for opensource edition and 'release-e50' for enterprise edition. @@ -45,18 +53,21 @@ logmsg() { } TAG="${1:-}" +DOCKER_LATEST_TAG= case "$TAG" in v*) TAG_PREFIX='v' PROFILE='emqx' SKIP_APPUP='yes' + DOCKER_LATEST_TAG='docker-latest-ce' ;; e*) TAG_PREFIX='e' PROFILE='emqx-enterprise' #TODO change to no when we are ready to support hot-upgrade SKIP_APPUP='yes' + DOCKER_LATEST_TAG='docker-latest-ee' ;; -h|--help) usage @@ -72,6 +83,7 @@ esac shift 1 DRYRUN='no' +DOCKER_LATEST='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -99,6 +111,10 @@ while [ "$#" -gt 0 ]; do PREV_TAG="$1" shift ;; + --docker-latest) + DOCKER_LATEST='yes' + shift + ;; *) logerr "Unknown option $1" exit 1 @@ -180,11 +196,11 @@ assert_release_version() { assert_release_version "$TAG" ## Check if all upstream branches are merged -if [ -z "${BASE_BR:-}" ]; then - ./scripts/rel/sync-remotes.sh -else - ./scripts/rel/sync-remotes.sh --base "$BASE_BR" -fi +SYNC_REMOTES_ARGS= +[ -n "${BASE_BR:-}" ] && SYNC_REMOTES_ARGS="--base $BASE_BR $SYNC_REMOTES_ARGS" +[ "$DRYRUN" = 'yes' ] && SYNC_REMOTES_ARGS="--dryrun $SYNC_REMOTES_ARGS" +# shellcheck disable=SC2086 +./scripts/rel/sync-remotes.sh $SYNC_REMOTES_ARGS ## Check if the Chart versions are in sync ./scripts/rel/check-chart-vsn.sh "$PROFILE" @@ -231,6 +247,9 @@ generate_changelog () { if [ "$DRYRUN" = 'yes' ]; then logmsg "Release tag is ready to be created with command: git tag $TAG" + if [ "$DOCKER_LATEST" = 'yes' ]; then + logmsg "Docker latest tag is ready to be created with command: git tag --force $DOCKER_LATEST_TAG" + fi else case "$TAG" in *rc*) @@ -252,4 +271,8 @@ else esac git tag "$TAG" logmsg "$TAG is created OK." + if [ "$DOCKER_LATEST" = 'yes' ]; then + git tag --force "$DOCKER_LATEST_TAG" + logmsg "$DOCKER_LATEST_TAG is created OK." + fi fi diff --git a/scripts/rel/sync-remotes.sh b/scripts/rel/sync-remotes.sh index dda910785..7f7c2885d 100755 --- a/scripts/rel/sync-remotes.sh +++ b/scripts/rel/sync-remotes.sh @@ -33,6 +33,10 @@ options: Without this option, the script executes 'git merge' command with '--ff-only' option which conveniently pulls remote updates if there is any, and fails when fast-forward is not possible + + --dryrun: + Do not perform merge. Run the checks, fetch from remote, + and show what's going to happen. EOF } @@ -48,6 +52,7 @@ logmsg() { } INTERACTIVE='no' +DRYRUN='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -63,6 +68,10 @@ while [ "$#" -gt 0 ]; do BASE_BRANCH="$1" shift ;; + --dryrun) + shift + DRYRUN='yes' + ;; *) logerr "Unknown option $1" exit 1 @@ -151,6 +160,10 @@ upstream_branches() { } for remote_ref in $(upstream_branches "$BASE_BRANCH"); do - logmsg "Merging $remote_ref" - git merge $MERGE_OPTS "$remote_ref" + if [ "$DRYRUN" = 'yes' ]; then + logmsg "Merge with this command: git merge $MERGE_OPTS $remote_ref" + else + logmsg "Merging $remote_ref" + git merge $MERGE_OPTS "$remote_ref" + fi done From 6be9967d2dd921ad5ec348f0b1f00a4c99dea7cf Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 1 Mar 2023 17:28:52 +0100 Subject: [PATCH 02/40] chore: add a reminder to push the tags --- scripts/rel/cut.sh | 11 +++++++++++ scripts/rel/sync-remotes.sh | 1 + 2 files changed, 12 insertions(+) diff --git a/scripts/rel/cut.sh b/scripts/rel/cut.sh index ab63d46fe..2a1c213fe 100755 --- a/scripts/rel/cut.sh +++ b/scripts/rel/cut.sh @@ -48,6 +48,11 @@ EOF logerr() { echo "$(tput setaf 1)ERROR: $1$(tput sgr0)" } + +logwarn() { + echo "$(tput setaf 3)WARNING: $1$(tput sgr0)" +} + logmsg() { echo "INFO: $1" } @@ -275,4 +280,10 @@ else git tag --force "$DOCKER_LATEST_TAG" logmsg "$DOCKER_LATEST_TAG is created OK." fi + logwarn "Don't forget to push the tags!" + if [ "$DOCKER_LATEST" = 'yes' ]; then + echo "git push --atomic --force origin $TAG $DOCKER_LATEST_TAG" + else + echo "git push origin $TAG" + fi fi diff --git a/scripts/rel/sync-remotes.sh b/scripts/rel/sync-remotes.sh index 7f7c2885d..eddce0cd7 100755 --- a/scripts/rel/sync-remotes.sh +++ b/scripts/rel/sync-remotes.sh @@ -43,6 +43,7 @@ EOF logerr() { echo "$(tput setaf 1)ERROR: $1$(tput sgr0)" } + logwarn() { echo "$(tput setaf 3)WARNING: $1$(tput sgr0)" } From 77dea0c77add65bed9243cfea72d369f92e77533 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 1 Mar 2023 21:43:38 +0200 Subject: [PATCH 03/40] fix: Handle obfuscated fields in bridges_probe API --- apps/emqx_bridge/src/emqx_bridge_api.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 293692ccd..3c7c30660 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -546,7 +546,8 @@ schema("/bridges_probe") -> RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> - case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of + Params1 = maybe_deobfuscate_bridge_probe(Params), + case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of ok -> {204}; {error, Error} -> @@ -556,6 +557,18 @@ schema("/bridges_probe") -> BadRequest end. +maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) -> + case emqx_bridge:lookup(BridgeType, BridgeName) of + {ok, _} -> + RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), + deobfuscate(Params, RawConf); + _ -> + %% A bridge may be probed before it's created, so not finding it here is fine + Params + end; +maybe_deobfuscate_bridge_probe(Params) -> + Params. + lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> FormatFun = fun format_bridge_info_without_metrics/1, do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). From 258329b9c7f96d474561e9087e542b0ee2972d1f Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Thu, 2 Mar 2023 11:57:38 +0100 Subject: [PATCH 04/40] ci: build slim packages on push to release-50 --- .github/workflows/build_slim_packages.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 692e4a987..64704b39c 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -8,6 +8,7 @@ on: push: branches: - master + - release-50 pull_request: # GitHub pull_request action is by default triggered when # opened reopened or synchronize, From baf01617cdf6e39c81c7f33757c485536e99ef14 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 2 Mar 2023 14:03:44 +0100 Subject: [PATCH 05/40] fix(quic): mark unsupp TLS options deprecated --- apps/emqx/src/emqx_schema.erl | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bb4520aa9..a673fa898 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1280,7 +1280,18 @@ fields("listener_wss_opts") -> true ); fields("listener_quic_ssl_opts") -> - server_ssl_opts_schema(#{}, false); + %% Mark unsupported TLS options deprecated. + lists:map( + fun({Name, Schema}) -> + case is_quic_ssl_opts(Name) of + true -> + {Name, Schema}; + false -> + {Name, Schema#{deprecated => {since, "5.0.20"}}} + end + end, + server_ssl_opts_schema(#{}, false) + ); fields("ssl_client_opts") -> client_ssl_opts_schema(#{}); fields("deflate_opts") -> @@ -2841,3 +2852,18 @@ quic_lowlevel_settings_uint(Low, High, Desc) -> desc => Desc } ). + +-spec is_quic_ssl_opts(string()) -> boolean(). +is_quic_ssl_opts(Name) -> + lists:member(Name, [ + "cacertfile", + "certfile", + "keyfile", + "verify" + %% Followings are planned + %% , "password" + %% , "hibernate_after" + %% , "fail_if_no_peer_cert" + %% , "handshake_timeout" + %% , "gc_after_handshake" + ]). From 6649a4f7d16dd73492db6ccf06df5d1a413e4eee Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 2 Mar 2023 15:51:12 +0100 Subject: [PATCH 06/40] docs: add change logs --- changes/ce/fix-10058.en.md | 7 +++++++ changes/ce/fix-10058.zh.md | 8 ++++++++ 2 files changed, 15 insertions(+) create mode 100644 changes/ce/fix-10058.en.md create mode 100644 changes/ce/fix-10058.zh.md diff --git a/changes/ce/fix-10058.en.md b/changes/ce/fix-10058.en.md new file mode 100644 index 000000000..337ac5d47 --- /dev/null +++ b/changes/ce/fix-10058.en.md @@ -0,0 +1,7 @@ +Deprecate unused QUIC TLS options. +Only following TLS options are kept for the QUIC listeners: + +- cacertfile +- certfile +- keyfile +- verify diff --git a/changes/ce/fix-10058.zh.md b/changes/ce/fix-10058.zh.md new file mode 100644 index 000000000..d1dea37c3 --- /dev/null +++ b/changes/ce/fix-10058.zh.md @@ -0,0 +1,8 @@ +废弃未使用的 QUIC TLS 选项。 +QUIC 监听器只保留以下 TLS 选项: + +- cacertfile +- certfile +- keyfile +- verify + From 64b5e9585e3f21fec6cdc4621f8a4f766d5c24cf Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 2 Mar 2023 16:41:09 +0100 Subject: [PATCH 07/40] feat(emqx_rule_engine): API format errors in a human readable way --- .../src/emqx_rule_engine_api.erl | 7 +++- .../test/emqx_rule_engine_api_SUITE.erl | 36 ++++++++++++++----- changes/ce/feat-10059.en.md | 1 + changes/ce/feat-10059.zh.md | 1 + 4 files changed, 35 insertions(+), 10 deletions(-) create mode 100644 changes/ce/feat-10059.en.md create mode 100644 changes/ce/feat-10059.zh.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 95c028a1e..15bec4390 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -423,7 +423,12 @@ param_path_id() -> %% Internal functions %%------------------------------------------------------------------------------ -err_msg(Msg) -> emqx_misc:readable_error_msg(Msg). +err_msg({RuleError, {_E, R, _S}}) when is_tuple(R) -> + emqx_misc:readable_error_msg(emqx_json:encode([{RuleError, element(1, R)}])); +err_msg({RuleError, {_E, R, _S}}) -> + emqx_misc:readable_error_msg(emqx_json:encode([{RuleError, R}])); +err_msg(Msg) -> + emqx_misc:readable_error_msg(Msg). format_rule_resp(Rules) when is_list(Rules) -> [format_rule_resp(R) || R <- Rules]; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 82a305009..3da8ae9db 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -119,12 +119,31 @@ t_crud_rule_api(_Config) -> emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}) ), + {400, #{ + code := 'BAD_REQUEST', + message := SelectAndTransformJsonError + }} = + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>) + ), ?assertMatch( - {400, #{ - code := 'BAD_REQUEST', - message := <<"{select_and_transform_error,{error,{decode_json_failed,", _/binary>> - }}, - emqx_rule_engine_api:'/rule_test'(post, test_rule_params()) + #{<<"select_and_transform_error">> := <<"decode_json_failed">>}, + emqx_json:decode(SelectAndTransformJsonError, [return_maps]) + ), + {400, #{ + code := 'BAD_REQUEST', + message := SelectAndTransformBadArgError + }} = + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">> + ) + ), + ?assertMatch( + #{<<"select_and_transform_error">> := <<"badarg">>}, + emqx_json:decode(SelectAndTransformBadArgError, [return_maps]) ), ok. @@ -221,19 +240,18 @@ t_reset_metrics_on_disable(_Config) -> ?assertMatch(#{passed := 0, matched := 0}, Metrics1), ok. -test_rule_params() -> +test_rule_params(Sql, Payload) -> #{ body => #{ <<"context">> => #{ <<"clientid">> => <<"c_emqx">>, <<"event_type">> => <<"message_publish">>, - <<"payload">> => <<"{\"msg\": \"hel">>, + <<"payload">> => Payload, <<"qos">> => 1, <<"topic">> => <<"t/a">>, <<"username">> => <<"u_emqx">> }, - <<"sql">> => - <<"SELECT\n payload.msg\nFROM\n \"t/#\"">> + <<"sql">> => Sql } }. diff --git a/changes/ce/feat-10059.en.md b/changes/ce/feat-10059.en.md new file mode 100644 index 000000000..2c4de015c --- /dev/null +++ b/changes/ce/feat-10059.en.md @@ -0,0 +1 @@ +Errors returned by rule engine API are formatted in a more human readable way rather than dumping the raw error including the stacktrace. diff --git a/changes/ce/feat-10059.zh.md b/changes/ce/feat-10059.zh.md new file mode 100644 index 000000000..112b943ac --- /dev/null +++ b/changes/ce/feat-10059.zh.md @@ -0,0 +1 @@ +由规则引擎 API 返回的错误以更人性化的方式格式化,而不是转储包括堆栈跟踪的原始错误。 From 072d310507685056ea72a2c30ca8ced33dcc8a1c Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Mar 2023 09:32:28 +0100 Subject: [PATCH 08/40] style: fix zn translation --- changes/ce/feat-10059.zh.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/feat-10059.zh.md b/changes/ce/feat-10059.zh.md index 112b943ac..99f8fe8ee 100644 --- a/changes/ce/feat-10059.zh.md +++ b/changes/ce/feat-10059.zh.md @@ -1 +1 @@ -由规则引擎 API 返回的错误以更人性化的方式格式化,而不是转储包括堆栈跟踪的原始错误。 +规则引擎 API 返回用户可读的错误信息而不是原始的栈追踪信息。 From 44eca1fa720f083558c287ae74e1d05825aa9bc7 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Mar 2023 09:36:13 +0100 Subject: [PATCH 09/40] fix(emqx_rule_engine): don't crash if we can't encode json --- .../src/emqx_rule_engine_api.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 15bec4390..63bd701fe 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -423,13 +423,21 @@ param_path_id() -> %% Internal functions %%------------------------------------------------------------------------------ -err_msg({RuleError, {_E, R, _S}}) when is_tuple(R) -> - emqx_misc:readable_error_msg(emqx_json:encode([{RuleError, element(1, R)}])); -err_msg({RuleError, {_E, R, _S}}) -> - emqx_misc:readable_error_msg(emqx_json:encode([{RuleError, R}])); +err_msg({RuleError, {_E, Reason, _S}}) -> + emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason)); err_msg(Msg) -> emqx_misc:readable_error_msg(Msg). +encode_nested_error(RuleError, Reason) when is_tuple(Reason) -> + encode_nested_error(RuleError, element(1, Reason)); +encode_nested_error(RuleError, Reason) -> + case emqx_json:safe_encode([{RuleError, Reason}]) of + {ok, Json} -> + Json; + _ -> + {RuleError, Reason} + end. + format_rule_resp(Rules) when is_list(Rules) -> [format_rule_resp(R) || R <- Rules]; format_rule_resp({Id, Rule}) -> From 6ebd3dc7474661ef38e718675f6fbff030c3abc9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 3 Mar 2023 09:59:00 +0100 Subject: [PATCH 10/40] feat(emqx_rule_engine): decompose error tuples sqltester for instance returns sth like {"...sytnax error...", OrigSql} --- .../src/emqx_rule_engine_api.erl | 2 ++ .../test/emqx_rule_engine_api_SUITE.erl | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 63bd701fe..62e1553d2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -425,6 +425,8 @@ param_path_id() -> err_msg({RuleError, {_E, Reason, _S}}) -> emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason)); +err_msg({Reason, _Details}) -> + emqx_misc:readable_error_msg(Reason); err_msg(Msg) -> emqx_misc:readable_error_msg(Msg). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 3da8ae9db..d89bc2651 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -40,6 +40,9 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> Config. +end_per_testcase(t_crud_rule_api, Config) -> + meck:unload(emqx_json), + end_per_testcase(common, Config); end_per_testcase(_, _Config) -> {200, #{data := Rules}} = emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}), @@ -145,6 +148,29 @@ t_crud_rule_api(_Config) -> #{<<"select_and_transform_error">> := <<"badarg">>}, emqx_json:decode(SelectAndTransformBadArgError, [return_maps]) ), + {400, #{ + code := 'BAD_REQUEST', + message := BadSqlMessage + }} = emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">> + ) + ), + ?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")), + meck:expect(emqx_json, safe_encode, 1, {error, foo}), + ?assertMatch( + {400, #{ + code := 'BAD_REQUEST', + message := <<"{select_and_transform_error,badarg}">> + }}, + emqx_rule_engine_api:'/rule_test'( + post, + test_rule_params( + <<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">> + ) + ) + ), ok. t_list_rule_api(_Config) -> From ea68a75725cbee63479da581d22ebb0f0675d2eb Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 3 Mar 2023 13:43:32 +0100 Subject: [PATCH 11/40] chore: reenable scripts/apps-version-check.sh --- scripts/apps-version-check.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 473005c9c..797204cc8 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash set -euo pipefail -exit 0 + latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*') echo "Compare base: $latest_release" From c01f62a1c166e30bfd6e19c7cb5d00b46ecf65ee Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 3 Mar 2023 13:43:54 +0100 Subject: [PATCH 12/40] chore: bump apps versions --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_connector/src/emqx_connector.app.src | 2 +- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 0d4b552ee..0ec246320 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index dfcf52902..f0d51a9ce 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.15"}, + {vsn, "0.1.16"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 06ed059a4..4c924b824 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.9"}, + {vsn, "5.0.10"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 5017abd21..6acbc43bd 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, From bff087f40a853bf85810818f6db940f2803a9c8c Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 3 Mar 2023 18:03:32 +0200 Subject: [PATCH 13/40] fix: restart emqx_statsd with the updated configuration emqx_config_handler:post_config_update/5 cb is called before an updated config is saved. Thus, a process being restarted in that callback cannot get the latest config by calling emqx_conf:get/2, because that update is not saved yet. Relates to EMQX-9055 --- apps/emqx_statsd/src/emqx_statsd.erl | 10 ++++---- apps/emqx_statsd/src/emqx_statsd_config.erl | 4 ++-- apps/emqx_statsd/src/emqx_statsd_sup.erl | 13 +++++++---- apps/emqx_statsd/test/emqx_statsd_SUITE.erl | 26 +++++++++++++++++++++ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 770320ddd..3f4045391 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -38,7 +38,7 @@ ]). %% Interface --export([start_link/0]). +-export([start_link/1]). %% Internal Exports -export([ @@ -68,17 +68,17 @@ do_restart() -> ok = do_start(), ok. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Conf) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []). -init([]) -> +init(Conf) -> process_flag(trap_exit, true), #{ tags := TagsRaw, server := Server, sample_time_interval := SampleTimeInterval, flush_time_interval := FlushTimeInterval - } = emqx_conf:get([statsd]), + } = Conf, {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], diff --git a/apps/emqx_statsd/src/emqx_statsd_config.erl b/apps/emqx_statsd/src/emqx_statsd_config.erl index b818d2691..6bc430956 100644 --- a/apps/emqx_statsd/src/emqx_statsd_config.erl +++ b/apps/emqx_statsd/src/emqx_statsd_config.erl @@ -45,9 +45,9 @@ remove_handler() -> ok = emqx_config_handler:remove_handler(?STATSD), ok. -post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) -> +post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP), - emqx_statsd_sup:ensure_child_started(?APP); + emqx_statsd_sup:ensure_child_started(?APP, New); post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP); post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 2845fb505..35c1d332c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -25,6 +25,7 @@ -export([ start_link/0, ensure_child_started/1, + ensure_child_started/2, ensure_child_stopped/1 ]). @@ -45,7 +46,11 @@ start_link() -> -spec ensure_child_started(atom()) -> ok. ensure_child_started(Mod) when is_atom(Mod) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). + ensure_child_started(Mod, emqx_conf:get([statsd], #{})). + +-spec ensure_child_started(atom(), map()) -> ok. +ensure_child_started(Mod, Conf) when is_atom(Mod) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))). %% @doc Stop the child worker process. -spec ensure_child_stopped(any()) -> ok. @@ -61,9 +66,9 @@ ensure_child_stopped(ChildId) -> init([]) -> Children = - case emqx_conf:get([statsd, enable], false) of - true -> [?CHILD(emqx_statsd, [])]; - false -> [] + case emqx_conf:get([statsd], #{}) of + #{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])]; + _ -> [] end, {ok, {{one_for_one, 100, 3600}, Children}}. diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2f8fa5a69..a203ef7d5 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -113,6 +113,32 @@ t_kill_exit(_) -> ?assertNotEqual(Estatsd, Estatsd1), ok. +t_config_update(_) -> + OldRawConf = emqx_conf:get_raw([statsd]), + {ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}), + CommonKeys = [flush_time_interval, sample_time_interval], + OldConf = emqx_conf:get([statsd]), + OldStatsDState = sys:get_state(emqx_statsd), + OldPid = erlang:whereis(emqx_statsd), + ?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)), + NewRawConfExpect = OldRawConf#{ + <<"flush_time_interval">> := <<"42s">>, + <<"sample_time_interval">> := <<"42s">> + }, + try + {ok, _} = emqx_statsd_config:update(NewRawConfExpect), + NewRawConf = emqx_conf:get_raw([statsd]), + NewConf = emqx_conf:get([statsd]), + NewStatsDState = sys:get_state(emqx_statsd), + NewPid = erlang:whereis(emqx_statsd), + ?assertNotEqual(OldRawConf, NewRawConf), + ?assertEqual(NewRawConfExpect, NewRawConf), + ?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)), + ?assertNotEqual(OldPid, NewPid) + after + {ok, _} = emqx_statsd_config:update(OldRawConf) + end. + request(Method) -> request(Method, []). request(Method, Body) -> From b3907128e8c199a5a35ef58ca4634ee02e0b12a8 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 3 Mar 2023 18:58:45 +0200 Subject: [PATCH 14/40] fix: set statsd flush_time_interval = max(flush_time_interval, sample_time_interval) flush_time_interval is used to calculate statsd sampling rate: rate = sample_time_interval / flush_time_interval This means that flush_time_interval must always be greater than (or equal to) sample_time_interval, otherwise, the sampling rate will be invalid (> 1). Relates to EMQX-9055 --- apps/emqx_statsd/src/emqx_statsd.app.src | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 27f842ce2..67825162e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ {description, "EMQX Statsd"}, - {vsn, "5.0.5"}, + {vsn, "5.0.6"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 3f4045391..75c15fa9e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -79,6 +79,7 @@ init(Conf) -> sample_time_interval := SampleTimeInterval, flush_time_interval := FlushTimeInterval } = Conf, + FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval), {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], @@ -86,7 +87,7 @@ init(Conf) -> {ok, ensure_timer(#{ sample_time_interval => SampleTimeInterval, - flush_time_interval => FlushTimeInterval, + flush_time_interval => FlushTimeInterval1, estatsd_pid => Pid })}. @@ -129,6 +130,19 @@ terminate(_Reason, #{estatsd_pid := Pid}) -> %% Internal function %%------------------------------------------------------------------------------ +flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval -> + FlushInterval; +flush_interval(_FlushInterval, SampleInterval) -> + ?SLOG( + warning, + #{ + msg => + "Configured flush_time_interval is lower than sample_time_interval, " + "setting: flush_time_interval = sample_time_interval." + } + ), + SampleInterval. + ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) -> State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. From 27c5389fdc914353a31ae2e3c73a4c03cf726d0e Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 6 Mar 2023 14:05:33 +0100 Subject: [PATCH 15/40] fix(quic): inval listener option casue segfault bump quicer to 0.0.113 --- apps/emqx/rebar.config.script | 2 +- mix.exs | 2 +- rebar.config.erl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 2025f5ad5..0827570ff 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -24,7 +24,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}. +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}. Dialyzer = fun(Config) -> {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), diff --git a/mix.exs b/mix.exs index 3baa83399..b7fae37d2 100644 --- a/mix.exs +++ b/mix.exs @@ -648,7 +648,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.111", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.113", override: true}], else: [] end diff --git a/rebar.config.erl b/rebar.config.erl index 9d9b0f874..349770487 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -39,7 +39,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}. From e9ffabf9369117b9dd6b4cb5f815ba8bead4edaa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 10:48:06 -0300 Subject: [PATCH 16/40] fix(buffer_worker): add batch time automatic adjustment To avoid message loss due to misconfigurations, we adjust `batch_time` based on `request_timeout`. If `batch_time` > `request_timeout`, all requests will timeout before being sent if the message rate is low. Even worse if `pool_size` is high. We cap `batch_time` at `request_timeout div 2` as a rule of thumb. --- .../src/emqx_resource_buffer_worker.erl | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index a8ae4454d..ac22e1c48 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -196,13 +196,16 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize, Id, Index), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), + RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), Data = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, - batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + batch_time => BatchTime, queue => Queue, resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined @@ -1639,3 +1642,46 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. -else. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). -endif. + +%% To avoid message loss due to misconfigurations, we adjust +%% `batch_time' based on `request_timeout'. If `batch_time' > +%% `request_timeout', all requests will timeout before being sent if +%% the message rate is low. Even worse if `pool_size' is high. +%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(Id, RequestTimeout, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), + case BatchTime =:= BatchTime0 of + false -> + ?SLOG(info, #{ + id => Id, + msg => adjusting_buffer_worker_batch_time, + new_batch_time => BatchTime + }); + true -> + ok + end, + BatchTime. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +adjust_batch_time_test_() -> + %% just for logging + Id = some_id, + [ + {"batch time smaller than request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 500, 100) + )}, + {"batch time equal to request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 200, 100) + )}, + {"batch time greater than request_time/2", + ?_assertEqual( + 50, + adjust_batch_time(Id, 100, 100) + )} + ]. +-endif. From 167b7a212f76eb248448b35d41fd1620b8ac7b2b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 13:39:59 -0300 Subject: [PATCH 17/40] refactor(buffer_worker): avoid starting 0-time timers --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index ac22e1c48..0f65b21f4 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1540,6 +1540,12 @@ clear_disk_queue_dir(Id, Index) -> ensure_flush_timer(Data = #{batch_time := T}) -> ensure_flush_timer(Data, T). +ensure_flush_timer(Data = #{tref := undefined}, 0) -> + %% if the batch_time is 0, we don't need to start a timer, which + %% can be costly at high rates. + Ref = make_ref(), + self() ! {flush, {Ref, Ref}}, + Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), From 9b087a21f51dc504172b3adcfeb8906ff3d842ac Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 11:49:41 -0300 Subject: [PATCH 18/40] fix(gcp_pubsub): remove conflicting `request_timeout` option Since the buffer worker schema already contains that configuration, having it two places can lead to quite confusing behavior. --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl | 9 --------- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 1 - .../src/emqx_ee_connector_gcp_pubsub.erl | 4 ++-- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index e00483839..ea9cedf4d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -79,15 +79,6 @@ fields(bridge_config) -> desc => ?DESC("max_retries") } )}, - {request_timeout, - sc( - emqx_schema:duration_ms(), - #{ - required => false, - default => <<"15s">>, - desc => ?DESC("request_timeout") - } - )}, {payload_template, sc( binary(), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 222acb77b..452b7a4d2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -282,7 +282,6 @@ gcp_pubsub_config(Config) -> "bridges.gcp_pubsub.~s {\n" " enable = true\n" " connect_timeout = 1s\n" - " request_timeout = 1s\n" " service_account_json = ~s\n" " payload_template = ~p\n" " pubsub_topic = ~s\n" diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 898c36fe0..f07cbceab 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -33,7 +33,7 @@ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), pubsub_topic := binary(), - request_timeout := emqx_schema:duration_ms(), + resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()}, service_account_json := service_account_json(), any() => term() }. @@ -71,7 +71,7 @@ on_start( payload_template := PayloadTemplate, pool_size := PoolSize, pubsub_topic := PubSubTopic, - request_timeout := RequestTimeout + resource_opts := #{request_timeout := RequestTimeout} } = Config ) -> ?SLOG(info, #{ From f95a30ae897bceeb4eafe5725f9f502695b244f6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 1 Mar 2023 15:01:57 -0300 Subject: [PATCH 19/40] fix(webhook): convert `request_timeout`s in root and resource_opts --- .../src/schema/emqx_bridge_schema.erl | 59 +++++++++++++++++-- .../test/emqx_bridge_api_SUITE.erl | 29 +++++++++ .../emqx_bridge_compatible_config_tests.erl | 58 +++++++++++++++++- .../src/emqx_resource_buffer_worker.erl | 2 +- 4 files changed, 141 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ed2baec8f..6b96e5150 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -140,11 +141,7 @@ fields(bridges) -> #{ desc => ?DESC("bridges_webhook"), required => false, - converter => fun(X, _HoconOpts) -> - emqx_bridge_compatible_config:upgrade_pre_ee( - X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ) - end + converter => fun webhook_bridge_converter/2 } )}, {mqtt, @@ -212,3 +209,55 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +webhook_bridge_converter(Conf0, _HoconOpts) -> + Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + ), + case Conf1 of + undefined -> + undefined; + _ -> + do_convert_webhook_config(Conf1) + end. + +do_convert_webhook_config( + #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf +) -> + %% ok: same values + Conf; +do_convert_webhook_config( + #{ + <<"request_timeout">> := ReqTRootRaw, + <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} + } = Conf0 +) -> + %% different values; we set them to the same, if they are valid + %% durations + MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), + MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), + case {MReqTRoot, MReqTResource} of + {{ok, ReqTRoot}, {ok, ReqTResource}} -> + {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), + ?SLOG( + debug, + #{ + msg => adjusting_webhook_bridge_request_time, + new_value => ReqTRaw + } + ), + Conf1 = emqx_map_lib:deep_merge( + Conf0, + #{ + <<"request_timeout">> => ReqTRaw, + <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} + } + ), + Conf1; + _ -> + %% invalid values; let the type checker complain about + %% that. + Conf0 + end; +do_convert_webhook_config(Conf) -> + Conf. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5f863ed63..d242111dc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -818,6 +818,35 @@ t_metrics(Config) -> ), ok. +%% request_timeout in bridge root should match request_timeout in +%% resource_opts. +t_inconsistent_webhook_request_timeouts(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + BadBridgeParams = + emqx_map_lib:deep_merge( + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name), + #{ + <<"request_timeout">> => <<"1s">>, + <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + } + ), + {ok, 201, RawResponse} = request( + post, + uri(["bridges"]), + BadBridgeParams + ), + %% note: same value on both fields + ?assertMatch( + #{ + <<"request_timeout">> := <<"2s">>, + <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} + }, + emqx_json:decode(RawResponse, [return_maps]) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 5e0b4912f..acafb84ca 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -28,6 +28,7 @@ empty_config_test() -> webhook_config_test() -> Conf1 = parse(webhook_v5011_hocon()), Conf2 = parse(full_webhook_v5011_hocon()), + Conf3 = parse(full_webhook_v5019_hocon()), ?assertMatch( #{ @@ -59,6 +60,26 @@ webhook_config_test() -> check(Conf2) ), + %% the converter should pick the greater of the two + %% request_timeouts and place them in the root and inside + %% resource_opts. + ?assertMatch( + #{ + <<"bridges">> := #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"request_timeout">> := 60_000, + <<"resource_opts">> := #{<<"request_timeout">> := 60_000}, + <<"body">> := <<"${payload}">> + } + } + } + }, + check(Conf3) + ), + ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> @@ -124,7 +145,7 @@ bridges{ max_retries = 3 method = \"get\" pool_size = 4 - request_timeout = \"5s\" + request_timeout = \"15s\" ssl {enable = false, verify = \"verify_peer\"} url = \"http://localhost:8080\" } @@ -164,6 +185,41 @@ full_webhook_v5011_hocon() -> "}\n" "". +%% does not contain direction +full_webhook_v5019_hocon() -> + "" + "\n" + "bridges{\n" + " webhook {\n" + " the_name{\n" + " body = \"${payload}\"\n" + " connect_timeout = \"5s\"\n" + " enable_pipelining = 100\n" + " headers {\"content-type\" = \"application/json\"}\n" + " max_retries = 3\n" + " method = \"get\"\n" + " pool_size = 4\n" + " pool_type = \"random\"\n" + " request_timeout = \"1m\"\n" + " resource_opts = {\n" + " request_timeout = \"7s\"\n" + " }\n" + " ssl {\n" + " ciphers = \"\"\n" + " depth = 10\n" + " enable = false\n" + " reuse_sessions = true\n" + " secure_renegotiate = true\n" + " user_lookup_fun = \"emqx_tls_psk:lookup\"\n" + " verify = \"verify_peer\"\n" + " versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n" + " }\n" + " url = \"http://localhost:8080\"\n" + " }\n" + " }\n" + "}\n" + "". + %% erlfmt-ignore %% this is a generated from v5.0.11 mqtt_v5011_hocon() -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0f65b21f4..58a97b5fe 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1544,7 +1544,7 @@ ensure_flush_timer(Data = #{tref := undefined}, 0) -> %% if the batch_time is 0, we don't need to start a timer, which %% can be costly at high rates. Ref = make_ref(), - self() ! {flush, {Ref, Ref}}, + self() ! {flush, Ref}, Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(), From 9825998207069c50ddf3d3aaa615249d01eb9e6c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:56:19 -0300 Subject: [PATCH 20/40] chore(gcp_pubsub): just deprecate `request_timeout` instead of removing --- .../emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf | 4 ++-- .../emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf index af2a93f82..b8fa3b43a 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf @@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub { request_timeout { desc { - en: "HTTP request timeout." - zh: "HTTP 请求超时。" + en: "Deprecated: Configure the request timeout in the buffer settings." + zh: "废弃的。在缓冲区设置中配置请求超时。" } label: { en: "Request Timeout" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index ea9cedf4d..352a7163a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -79,6 +79,16 @@ fields(bridge_config) -> desc => ?DESC("max_retries") } )}, + {request_timeout, + sc( + emqx_schema:duration_ms(), + #{ + required => false, + deprecated => {since, "e5.0.1"}, + default => <<"15s">>, + desc => ?DESC("request_timeout") + } + )}, {payload_template, sc( binary(), From abb527117749d103254b57c13e246dfb12bdd7aa Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 6 Mar 2023 14:58:35 +0100 Subject: [PATCH 21/40] test(quic): start listener with invalid parameter --- apps/emqx/test/emqx_common_test_helpers.erl | 8 +++++--- .../test/emqx_quic_multistreams_SUITE.erl | 19 ++++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index dd88b013d..ce3a39dcf 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -537,10 +537,12 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) -> mountpoint => <<>>, zone => default }, - emqx_config:put([listeners, quic, Name], maps:merge(Conf, ExtraSettings)), - case emqx_listeners:start_listener(quic, Name, Conf) of + Conf2 = maps:merge(Conf, ExtraSettings), + emqx_config:put([listeners, quic, Name], Conf2), + case emqx_listeners:start_listener(emqx_listeners:listener_id(quic, Name)) of ok -> ok; - {error, {already_started, _Pid}} -> ok + {error, {already_started, _Pid}} -> ok; + Other -> throw(Other) end. %% diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index b0121314c..b0eefba0d 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -33,7 +33,8 @@ all() -> {group, mstream}, {group, shutdown}, {group, misc}, - t_listener_with_lowlevel_settings + t_listener_with_lowlevel_settings, + t_listener_inval_settings ]. groups() -> @@ -1885,8 +1886,17 @@ t_multi_streams_sub_0_rtt_stream_data_cont(Config) -> ok = emqtt:disconnect(C), ok = emqtt:disconnect(C0). +t_listener_inval_settings(_Config) -> + LPort = select_port(), + %% too small + LowLevelTunings = #{stream_recv_buffer_default => 1024}, + ?assertThrow( + {error, {failed_to_start, _}}, + emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, LPort, LowLevelTunings) + ). + t_listener_with_lowlevel_settings(_Config) -> - LPort = 24567, + LPort = select_port(), LowLevelTunings = #{ max_bytes_per_key => 274877906, %% In conf schema we use handshake_idle_timeout @@ -1897,7 +1907,7 @@ t_listener_with_lowlevel_settings(_Config) -> %% tls_client_max_send_buffer, tls_server_max_send_buffer => 10240, stream_recv_window_default => 1024, - stream_recv_buffer_default => 1024, + stream_recv_buffer_default => 10240, conn_flow_control_window => 1024, max_stateless_operations => 16, initial_window_packets => 1300, @@ -1936,8 +1946,7 @@ t_listener_with_lowlevel_settings(_Config) -> {ok, _, [_SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ {<<"test/1/3">>, [{qos, 2}]} ]), - ok = emqtt:disconnect(C), - emqx_listeners:stop_listener(emqx_listeners:listener_id(quic, ?FUNCTION_NAME)). + ok = emqtt:disconnect(C). %%-------------------------------------------------------------------- %% Helper functions From c283902a623909a04b54eb877424573a90ca1525 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 6 Mar 2023 16:16:45 +0100 Subject: [PATCH 22/40] docs(quic): changelogs. --- changes/ce/fix-10078.en.md | 2 ++ changes/ce/fix-10078.zh.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 changes/ce/fix-10078.en.md create mode 100644 changes/ce/fix-10078.zh.md diff --git a/changes/ce/fix-10078.en.md b/changes/ce/fix-10078.en.md new file mode 100644 index 000000000..afb7bcbe0 --- /dev/null +++ b/changes/ce/fix-10078.en.md @@ -0,0 +1,2 @@ +Fix an issue that invalid QUIC listener setting could casue segfault. + diff --git a/changes/ce/fix-10078.zh.md b/changes/ce/fix-10078.zh.md new file mode 100644 index 000000000..47a774d1e --- /dev/null +++ b/changes/ce/fix-10078.zh.md @@ -0,0 +1,2 @@ +修复了无效的 QUIC 监听器设置可能导致 segfault 的问题。 + From e9d3fc511fdfc4d37fb14ca94bb8cc3c0e32e902 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:56:44 -0300 Subject: [PATCH 23/40] chore(buffer_worker): change default `batch_time` to 0 and improve docs --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 8 ++++---- apps/emqx_resource/include/emqx_resource.hrl | 4 ++-- lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl | 4 +++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index f3ac2fd97..35e2905df 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only.""" batch_time { desc { - en: """Maximum batch waiting interval.""" - zh: """最大批量请求等待时间。""" + en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage.""" + zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。""" } label { - en: """Batch time""" - zh: """批量等待间隔""" + en: """Max Batch Wait Time""" + zh: """批量等待最大间隔""" } } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa7f2eb38..be570e694 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -97,8 +97,8 @@ -define(DEFAULT_BATCH_SIZE, 1). %% milliseconds --define(DEFAULT_BATCH_TIME, 20). --define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>). +-define(DEFAULT_BATCH_TIME, 0). +-define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>). %% count -define(DEFAULT_INFLIGHT, 100). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 67a9b4a05..4eeebfaf8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -540,7 +540,9 @@ resource_configs() -> <<"query_mode">> => <<"sync">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), - <<"start_timeout">> => <<"15s">> + <<"start_timeout">> => <<"15s">>, + <<"batch_time">> => <<"4s">>, + <<"request_timeout">> => <<"30s">> } }. From e17ad320eeacefbabf75d059390915bb14ac5936 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 09:57:10 -0300 Subject: [PATCH 24/40] fix(bridge): do not log in converter --- apps/emqx_bridge/src/schema/emqx_bridge_schema.erl | 7 ------- 1 file changed, 7 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 6b96e5150..74d2a5ca1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -239,13 +239,6 @@ do_convert_webhook_config( case {MReqTRoot, MReqTResource} of {{ok, ReqTRoot}, {ok, ReqTResource}} -> {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), - ?SLOG( - debug, - #{ - msg => adjusting_webhook_bridge_request_time, - new_value => ReqTRaw - } - ), Conf1 = emqx_map_lib:deep_merge( Conf0, #{ From 0e707e837f747dda095ce776ebe094bcd2f8e725 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 10:14:13 -0300 Subject: [PATCH 25/40] docs(buffer_worker): improve description of `request_timeout` --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 35e2905df..fb6b2eb06 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out.""" - zh: """请求的超时。 如果query_modesync,对资源的调用将在超时前被阻断这一时间。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" } label { - en: """Request timeout""" - zh: """请求超时""" + en: """Request Expiry""" + zh: """请求超期""" } } From 18ab7ed19792252f2159ee8b866555fa86f235d1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 10:36:00 -0300 Subject: [PATCH 26/40] chore: bump app vsns --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index cb26c7f09..0cc013099 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index c30c927f2..05d893a79 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, From eef65fba606d4d2a916e41cc00419d2782b87449 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 Mar 2023 13:19:20 -0300 Subject: [PATCH 27/40] fix(buffer_worker): handle `request_timeout = infinity` case The current schema allows `infinity` for `request_timeout`, so we have to take that into account. It's not currently possible to set `batch_time = infinity`, so there's no need to treat that case. --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 58a97b5fe..d5a50f351 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1654,6 +1654,8 @@ do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, %% `request_timeout', all requests will timeout before being sent if %% the message rate is low. Even worse if `pool_size' is high. %% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) -> + BatchTime0; adjust_batch_time(Id, RequestTimeout, BatchTime0) -> BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), case BatchTime =:= BatchTime0 of @@ -1688,6 +1690,11 @@ adjust_batch_time_test_() -> ?_assertEqual( 50, adjust_batch_time(Id, 100, 100) + )}, + {"batch time smaller than request_time/2 (request_time = infinity)", + ?_assertEqual( + 100, + adjust_batch_time(Id, infinity, 100) )} ]. -endif. From 133734b345c49da57da6a1a4895748be674c28fb Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 08:39:59 +0100 Subject: [PATCH 28/40] chore: bump version number to e5.0.1-rc.1 and v5.0.20 --- apps/emqx/include/emqx_release.hrl | 4 ++-- apps/emqx/src/emqx.app.src | 2 +- deploy/charts/emqx/Chart.yaml | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index ffaa4aa36..c2326ac95 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Community edition --define(EMQX_RELEASE_CE, "5.0.19"). +-define(EMQX_RELEASE_CE, "5.0.20"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.1-beta.1"). +-define(EMQX_RELEASE_EE, "5.0.1-rc.1"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 3030ccb06..e195107ed 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.18"}, + {vsn, "5.0.19"}, {modules, []}, {registered, []}, {applications, [ diff --git a/deploy/charts/emqx/Chart.yaml b/deploy/charts/emqx/Chart.yaml index 71c2122b1..bccccb0c0 100644 --- a/deploy/charts/emqx/Chart.yaml +++ b/deploy/charts/emqx/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.0.19 +version: 5.0.20 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.0.19 +appVersion: 5.0.20 From 8ce8f05a77e030f5e6dcc2baf37d4796918055c6 Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Tue, 7 Mar 2023 16:02:07 +0800 Subject: [PATCH 29/40] chore: upgrade dashboard to e1.0.4 for ee --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 3d582fafd..2f7ab5244 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.1.8 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4-beta.3 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) From 6f88cb7d9ecd060fe4d082ee191c5eb0bf5837e6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 7 Mar 2023 12:01:33 +0100 Subject: [PATCH 30/40] chore(ekka): Bump version to 0.14.3 --- apps/emqx/rebar.config | 2 +- changes/ce/fix-10084.en.md | 3 +++ changes/ce/fix-10084.zh.md | 3 +++ mix.exs | 2 +- rebar.config | 2 +- 5 files changed, 9 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10084.en.md create mode 100644 changes/ce/fix-10084.zh.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 798153a57..0ecbbfc1a 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, diff --git a/changes/ce/fix-10084.en.md b/changes/ce/fix-10084.en.md new file mode 100644 index 000000000..90da7d660 --- /dev/null +++ b/changes/ce/fix-10084.en.md @@ -0,0 +1,3 @@ +Fix problem when joining core nodes running different EMQX versions into a cluster. + +[Mria PR](https://github.com/emqx/mria/pull/127) diff --git a/changes/ce/fix-10084.zh.md b/changes/ce/fix-10084.zh.md new file mode 100644 index 000000000..52bfdd06a --- /dev/null +++ b/changes/ce/fix-10084.zh.md @@ -0,0 +1,3 @@ +修正将运行不同EMQX版本的核心节点加入集群的问题。 + +[Mria PR](https://github.com/emqx/mria/pull/127) diff --git a/mix.exs b/mix.exs index b7fae37d2..6a7f9fb50 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-7", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.14.2", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.14.3", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.8", override: true}, diff --git a/rebar.config b/rebar.config index f781d4356..2e478869d 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-7"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}} From 0e9665fd9959ff863b90d82ec2344ce34da04afd Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 7 Mar 2023 12:22:57 +0100 Subject: [PATCH 31/40] fix(changelog): Apply suggestions from code review Co-authored-by: William Yang --- changes/ce/fix-10084.zh.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-10084.zh.md b/changes/ce/fix-10084.zh.md index 52bfdd06a..dd44533cf 100644 --- a/changes/ce/fix-10084.zh.md +++ b/changes/ce/fix-10084.zh.md @@ -1,3 +1,3 @@ -修正将运行不同EMQX版本的核心节点加入集群的问题。 +修正将运行不同 EMQX 版本的核心节点加入集群的问题。 [Mria PR](https://github.com/emqx/mria/pull/127) From ca947e3e702ec57d62b95dcc7441910bdc6679a7 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Mar 2023 14:17:23 +0100 Subject: [PATCH 32/40] fix: lost messages when HTTP connection times out When using async mode with the webhook bridge, queued messages that are not fully processed when the connection times out could be lost. This commit fixes this by letting the bridge return a recoverable_error when this happen. The message send will then be retried in sync mode by the emqx_resource_buffer_worker. Fixes: https://emqx.atlassian.net/browse/EMQX-8974 --- .../test/emqx_bridge_webhook_SUITE.erl | 252 ++++++++++++++++++ .../src/emqx_connector_http.erl | 4 +- 2 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl new file mode 100644 index 000000000..9446b0ffe --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_webhook_SUITE). + +%% This suite should contains testcases that are specific for the webhook +%% bridge. There are also some test cases that implicitly tests the webhook +%% bridge in emqx_bridge_api_SUITE + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [request/3, uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(_Config) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + snabbkaffe:fix_ct_logging(), + []. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_ee_connector), + _ = application:stop(emqx_bridge), + ok. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%%------------------------------------------------------------------------------ +%% HTTP server for testing +%% (Orginally copied from emqx_bridge_api_SUITE) +%%------------------------------------------------------------------------------ +start_http_server(HTTPServerConfig) -> + ct:pal("Start server\n"), + process_flag(trap_exit, true), + Parent = self(), + {Port, Sock} = listen_on_random_port(), + Acceptor = spawn(fun() -> + accept_loop(Sock, Parent, HTTPServerConfig) + end), + timer:sleep(100), + #{port => Port, sock => Sock, acceptor => Acceptor}. + +stop_http_server(#{sock := Sock, acceptor := Acceptor}) -> + ct:pal("Stop server\n"), + exit(Acceptor, kill), + gen_tcp:close(Sock). + +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +accept_loop(Sock, Parent, HTTPServerConfig) -> + process_flag(trap_exit, true), + {ok, Conn} = gen_tcp:accept(Sock), + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig). + +make_response(CodeStr, Str) -> + B = iolist_to_binary(Str), + iolist_to_binary( + io_lib:fwrite( + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", + [CodeStr, size(B), B] + ) + ). + +handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> + ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), + ct:pal("Waiting for request~n"), + case gen_tcp:recv(Conn, 0) of + {ok, ReqStr} -> + ct:pal("The http handler got request: ~p", [ReqStr]), + Req = parse_http_request(ReqStr), + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig); + {error, closed} -> + ct:pal("http connection closed"); + {error, Reason} -> + ct:pal("the http handler recv error: ~p", [Reason]), + timer:sleep(100), + gen_tcp:close(Conn) + end. + +parse_http_request(ReqStr0) -> + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), + #{method => Method, path => Path, body => Body}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, <<"webhook">>), + Name = maps:get(name, Config, atom_to_binary(?MODULE)), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, 1), + RequestTimeout = maps:get(request_timeout, Config, 10000), + ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://localhost:~p\"\n" + " connect_timeout = \"~ps\"\n" + " enable = true\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~ps\"\n" + " body = \"${id}\"" + " resource_opts {\n" + " async_inflight_window = 100\n" + " auto_restart_interval = \"60s\"\n" + " health_check_interval = \"15s\"\n" + " max_queue_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_timeout = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Port, + ConnectTimeout, + PoolSize, + RequestTimeout, + QueryMode, + ResourceRequestTimeout + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +make_bridge(Config) -> + Type = <<"webhook">>, + Name = atom_to_binary(?MODULE), + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. +%% When the connection time out all the queued requests where dropped in +t_send_async_connection_timeout(_Config) -> + ResponseDelayMS = 90, + #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), + % Port = 9000, + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "async", + connect_timeout => ResponseDelayMS * 2, + request_timeout => 10000, + resouce_request_timeout => "infinity" + }), + NumberOfMessagesToSend = 10, + [ + emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) + || Id <- lists:seq(1, NumberOfMessagesToSend) + ], + %% Make sure server recive all messages + ct:pal("Sent messages\n"), + MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), + receive_request_notifications(MessageIDs, ResponseDelayMS), + stop_http_server(Server), + ok. + +receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> + ok; +receive_request_notifications(MessageIDs, ResponseDelay) -> + receive + {http_server, received, Req} -> + RemainingMessageIDs = remove_message_id(MessageIDs, Req), + receive_request_notifications(RemainingMessageIDs, ResponseDelay) + after (30 * 1000) -> + ct:pal("Waited to long time but did not get any message\n"), + ct:fail("All requests did not reach server at least once") + end. + +remove_message_id(MessageIDs, #{body := IDBin}) -> + try + ID = erlang:binary_to_integer(IDBin), + maps:remove(ID, MessageIDs) + catch + _:_ -> + %% It is acceptable to get the same message more than once + MessageIDs + end. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7d91e18b9..09fa988d3 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -564,7 +564,9 @@ bin(Atom) when is_atom(Atom) -> reply_delegator(ReplyFunAndArgs, Result) -> case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + %% The normal reason happens when the HTTP connection times out before + %% the request has been fully processed + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> Result1 = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); _ -> From 163b33ab287b3b5463a0064251a18384fdf0c767 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Mar 2023 15:20:34 +0100 Subject: [PATCH 33/40] test: remove unnecessary dependencies of ee apps --- .../test/emqx_bridge_webhook_SUITE.erl | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 9446b0ffe..8c688a0ec 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -38,16 +38,13 @@ init_per_suite(_Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(emqx_ee_connector), - {ok, _} = application:ensure_all_started(emqx_ee_bridge), snabbkaffe:fix_ct_logging(), []. end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), - _ = application:stop(emqx_ee_connector), _ = application:stop(emqx_bridge), ok. @@ -242,11 +239,6 @@ receive_request_notifications(MessageIDs, ResponseDelay) -> end. remove_message_id(MessageIDs, #{body := IDBin}) -> - try - ID = erlang:binary_to_integer(IDBin), - maps:remove(ID, MessageIDs) - catch - _:_ -> - %% It is acceptable to get the same message more than once - MessageIDs - end. + ID = erlang:binary_to_integer(IDBin), + %% It is acceptable to get the same message more than once + maps:without([ID], MessageIDs). From 93ebd59fb24b78e28e8567532001cb6490202a9b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 20:53:15 +0100 Subject: [PATCH 34/40] docs: add changelogs for PR 10076 --- changes/ce/fix-10076.en.md | 2 ++ changes/ce/fix-10076.zh.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 changes/ce/fix-10076.en.md create mode 100644 changes/ce/fix-10076.zh.md diff --git a/changes/ce/fix-10076.en.md b/changes/ce/fix-10076.en.md new file mode 100644 index 000000000..e01df765b --- /dev/null +++ b/changes/ce/fix-10076.en.md @@ -0,0 +1,2 @@ +Fix webhook bridge error handling: connection timeout should be a retriable error. +Prior to this fix, connection timeout was clasified as unrecoverable error hence lead to request being dropped. diff --git a/changes/ce/fix-10076.zh.md b/changes/ce/fix-10076.zh.md new file mode 100644 index 000000000..516345f92 --- /dev/null +++ b/changes/ce/fix-10076.zh.md @@ -0,0 +1,2 @@ +修复 HTTP 桥接的一个异常处理:连接超时错误发生后,发生错误的请求可以被重试。 +在此修复前,连接超时后,被当作不可重试类型的错误处理,导致请求被丢弃。 From 26b29185b2ce9b7d049bd692a0c6f28e11027d88 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 20:53:37 +0100 Subject: [PATCH 35/40] test(emqx_bridge_webhook_SUITE): fix flakyness in test web server --- .../test/emqx_bridge_webhook_SUITE.erl | 55 +++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 8c688a0ec..4c349c7a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -59,7 +59,7 @@ start_http_server(HTTPServerConfig) -> ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), - {Port, Sock} = listen_on_random_port(), + {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), @@ -76,17 +76,22 @@ listen_on_random_port() -> case gen_tcp:listen(0, SockOpts) of {ok, Sock} -> {ok, Port} = inet:port(Sock), - {Port, Sock}; - {error, Reason} when Reason /= eaddrinuse -> + {ok, {Port, Sock}}; + {error, Reason} when Reason =/= eaddrinuse -> {error, Reason} end. accept_loop(Sock, Parent, HTTPServerConfig) -> process_flag(trap_exit, true), - {ok, Conn} = gen_tcp:accept(Sock), - spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end), - %%gen_tcp:controlling_process(Conn, Handler), - accept_loop(Sock, Parent, HTTPServerConfig). + case gen_tcp:accept(Sock) of + {ok, Conn} -> + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, <<>>) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig); + {error, closed} -> + %% socket owner died + ok + end. make_response(CodeStr, Str) -> B = iolist_to_binary(Str), @@ -97,17 +102,21 @@ make_response(CodeStr, Str) -> ) ). -handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> +handle_fun_200_ok(Conn, Parent, HTTPServerConfig, Acc) -> ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), ct:pal("Waiting for request~n"), case gen_tcp:recv(Conn, 0) of {ok, ReqStr} -> ct:pal("The http handler got request: ~p", [ReqStr]), - Req = parse_http_request(ReqStr), - timer:sleep(ResponseDelayMS), - Parent ! {http_server, received, Req}, - gen_tcp:send(Conn, make_response("200 OK", "Request OK")), - handle_fun_200_ok(Conn, Parent, HTTPServerConfig); + case parse_http_request(<>) of + {ok, incomplete, NewAcc} -> + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc); + {ok, Req, NewAcc} -> + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc) + end; {error, closed} -> ct:pal("http connection closed"); {error, Reason} -> @@ -116,12 +125,26 @@ handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> gen_tcp:close(Conn) end. -parse_http_request(ReqStr0) -> +parse_http_request(ReqStr) -> + try + parse_http_request_assertive(ReqStr) + catch + _:_ -> + {ok, incomplete, ReqStr} + end. + +parse_http_request_assertive(ReqStr0) -> + %% find body length + [_, LengthStr0] = string:split(ReqStr0, "content-length:"), + [LengthStr, _] = string:split(LengthStr0, "\r\n"), + Length = binary_to_integer(string:trim(LengthStr, both)), + %% split between multiple requests [Method, ReqStr1] = string:split(ReqStr0, " ", leading), [Path, ReqStr2] = string:split(ReqStr1, " ", leading), [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), - [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), - #{method => Method, path => Path, body => Body}. + [_HeaderStr, Rest] = string:split(ReqStr3, "\r\n\r\n", leading), + <> = Rest, + {ok, #{method => Method, path => Path, body => Body}, Remain}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Helper functions From 116137a447d8be7993794108d41ca33b5a5da1ee Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 21:58:05 +0100 Subject: [PATCH 36/40] docs: fix typos in change log Co-authored-by: Ivan Dyachkov --- changes/ce/fix-10076.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-10076.en.md b/changes/ce/fix-10076.en.md index e01df765b..5bbbffa32 100644 --- a/changes/ce/fix-10076.en.md +++ b/changes/ce/fix-10076.en.md @@ -1,2 +1,2 @@ Fix webhook bridge error handling: connection timeout should be a retriable error. -Prior to this fix, connection timeout was clasified as unrecoverable error hence lead to request being dropped. +Prior to this fix, connection timeout was classified as unrecoverable error and led to request being dropped. From 97e71c54d44e3afbbc6bfa4a53780eb5e34e187a Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 7 Mar 2023 19:13:44 +0200 Subject: [PATCH 37/40] fix: use default template if timestamp is empty (undefined) in InfluxDB bridge Closes EMQX-8926 --- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 12 ++-- .../src/emqx_ee_connector_influxdb.erl | 68 +++++++++++++------ .../test/emqx_ee_connector_influxdb_SUITE.erl | 3 +- 3 files changed, 55 insertions(+), 28 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 6e5220ae0..8014dbdcc 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -527,7 +527,8 @@ t_start_ok(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?check_trace( begin @@ -685,7 +686,8 @@ t_const_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"payload">> => Payload + <<"payload">> => Payload, + <<"timestamp">> => erlang:system_time(millisecond) }, ?assertEqual(ok, send_message(Config, SentData)), case QueryMode of @@ -740,7 +742,7 @@ t_boolean_variants(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?assertEqual(ok, send_message(Config, SentData)), @@ -805,7 +807,7 @@ t_bad_timestamp(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( @@ -949,7 +951,7 @@ t_write_failure(Config) -> SentData = #{ <<"clientid">> => ClientId, <<"topic">> => atom_to_binary(?FUNCTION_NAME), - <<"timestamp">> => erlang:system_time(nanosecond), + <<"timestamp">> => erlang:system_time(millisecond), <<"payload">> => Payload }, ?check_trace( diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index a361e7035..7f5b56181 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -35,11 +35,15 @@ desc/1 ]). +-type ts_precision() :: ns | us | ms | s. + %% influxdb servers don't need parse -define(INFLUXDB_HOST_OPTIONS, #{ default_port => ?INFLUXDB_DEFAULT_PORT }). +-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}"). + %% ------------------------------------------------------------------------------------------------- %% resource callback callback_mode() -> async_if_possible. @@ -232,15 +236,14 @@ do_start_client( ClientConfig, Config = #{write_syntax := Lines} ) -> + Precision = maps:get(precision, Config, ms), case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client) of true -> State = #{ client => Client, - write_syntax => to_config( - Lines, proplists:get_value(precision, ClientConfig) - ) + write_syntax => to_config(Lines, Precision) }, ?SLOG(info, #{ msg => "starting influxdb connector success", @@ -407,27 +410,36 @@ to_config(Lines, Precision) -> to_config([], Acc, _Precision) -> lists:reverse(Acc); to_config([Item0 | Rest], Acc, Precision) -> - Ts = maps:get(timestamp, Item0, undefined), + Ts0 = maps:get(timestamp, Item0, undefined), + {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), Item = #{ measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), - timestamp => preproc_tmpl_timestamp(Ts, Precision), + timestamp => Ts, + precision => {FromPrecision, ToPrecision}, tags => to_kv_config(maps:get(tags, Item0)), fields => to_kv_config(maps:get(fields, Item0)) }, to_config(Rest, [Item | Acc], Precision). -preproc_tmpl_timestamp(undefined, <<"ns">>) -> - erlang:system_time(nanosecond); -preproc_tmpl_timestamp(undefined, <<"us">>) -> - erlang:system_time(microsecond); -preproc_tmpl_timestamp(undefined, <<"ms">>) -> - erlang:system_time(millisecond); -preproc_tmpl_timestamp(undefined, <<"s">>) -> - erlang:system_time(second); -preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) -> - Ts; -preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) -> - emqx_plugin_libs_rule:preproc_tmpl(Ts). +%% pre-process the timestamp template +%% returns a tuple of three elements: +%% 1. The timestamp template itself. +%% 2. The source timestamp precision (ms if the template ${timestamp} is used). +%% 3. The target timestamp precision (configured for the client). +preproc_tmpl_timestamp(undefined, Precision) -> + %% not configured, we default it to the message timestamp + preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision); +preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> + %% a const value is used which is very much unusual, but we have to add a special handling + {Ts, Precision, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> + preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); +preproc_tmpl_timestamp(<> = Ts, Precision) -> + {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; +preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> + %% a placehold is in use. e.g. ${payload.my_timestamp} + %% we can only hope it the value will be of the same precision in the configs + {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). @@ -470,7 +482,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := emqx_plugin_libs_rule:tmpl_token() | integer() + timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), + precision := {From :: ts_precision(), To :: ts_precision()} } ]) -> {ok, [map()]} | {error, term()}. data_to_points(Data, SyntaxLines) -> @@ -529,16 +542,27 @@ line_to_point( #{ measurement := Measurement, tags := Tags, - fields := Fields + fields := Fields, + timestamp := Ts, + precision := Precision } = Item ) -> {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), - Item#{ + maps:without([precision], Item#{ measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), tags => EncodedTags, - fields => EncodedFields - }. + fields => EncodedFields, + timestamp => maybe_convert_time_unit(Ts, Precision) + }). + +maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) -> + erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)). + +time_unit(s) -> second; +time_unit(ms) -> millisecond; +time_unit(us) -> microsecond; +time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl index f5e43c0bb..72fc11a67 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_influxdb_SUITE.erl @@ -227,5 +227,6 @@ test_query() -> {send_message, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, - <<"topic">> => <<"connector_test">> + <<"topic">> => <<"connector_test">>, + <<"timestamp">> => 1678220316257 }}. From 19a7c3aeb93aac46f725f00b7b9402c3b7dcd496 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 8 Mar 2023 11:57:56 +0100 Subject: [PATCH 38/40] chore: add changelog --- changes/ee/fix-10087.en.md | 2 ++ changes/ee/fix-10087.zh.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 changes/ee/fix-10087.en.md create mode 100644 changes/ee/fix-10087.zh.md diff --git a/changes/ee/fix-10087.en.md b/changes/ee/fix-10087.en.md new file mode 100644 index 000000000..fd6e10b7b --- /dev/null +++ b/changes/ee/fix-10087.en.md @@ -0,0 +1,2 @@ +Use default template `${timestamp}` if the `timestamp` config is empty (undefined) when inserting data in InfluxDB. +Prior to this change, InfluxDB bridge inserted a wrong timestamp when template is not provided. diff --git a/changes/ee/fix-10087.zh.md b/changes/ee/fix-10087.zh.md new file mode 100644 index 000000000..e08e61f37 --- /dev/null +++ b/changes/ee/fix-10087.zh.md @@ -0,0 +1,2 @@ +在 InfluxDB 中插入数据时,如果时间戳为空(未定义),则使用默认的占位符 `${timestamp}`。 +在此修复前,如果时间戳字段没有设置,InfluxDB 桥接使用了一个错误的时间戳。 From a952e191097d092b665e65ede7b6ccd1067ebead Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 17:46:48 +0100 Subject: [PATCH 39/40] chore: upgrade to ehttpc 0.4.7 --- mix.exs | 2 +- rebar.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 6a7f9fb50..3d3657ebb 100644 --- a/mix.exs +++ b/mix.exs @@ -48,7 +48,7 @@ defmodule EMQXUmbrella.MixProject do {:redbug, "2.0.8"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.6", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.7", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index 2e478869d..89cc7923d 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.6"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} From ac6325c82382f9fb36dd16ce2e505360d3e4d739 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 8 Mar 2023 12:56:12 +0100 Subject: [PATCH 40/40] docs: add change log for PR 10086 --- changes/ce/fix-10086.en.md | 4 ++++ changes/ce/fix-10086.zh.md | 3 +++ 2 files changed, 7 insertions(+) create mode 100644 changes/ce/fix-10086.en.md create mode 100644 changes/ce/fix-10086.zh.md diff --git a/changes/ce/fix-10086.en.md b/changes/ce/fix-10086.en.md new file mode 100644 index 000000000..31e8b6453 --- /dev/null +++ b/changes/ce/fix-10086.en.md @@ -0,0 +1,4 @@ +Upgrade HTTP client ehttpc to `0.4.7`. +Prior to this upgrade, HTTP clients for authentication, authorization and webhook may crash +if `body` is empty but content-type HTTP header is set. +For more details see [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44). diff --git a/changes/ce/fix-10086.zh.md b/changes/ce/fix-10086.zh.md new file mode 100644 index 000000000..b7c110ea4 --- /dev/null +++ b/changes/ce/fix-10086.zh.md @@ -0,0 +1,3 @@ +HTTP 客户端库 `ehttpc` 升级到 0.4.7。 +在升级前,如果 HTTP 客户端,例如 认证,授权,webhook 等配置中使用了content-type HTTP 头,但是没有配置 body,则可能会发生异常。 +详情见 [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44)。