diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index cfcb7baca..b89c6e10a 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -24,6 +24,11 @@ jobs: steps: - uses: actions/checkout@v1 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up env: LDAP_TAG: ${{ matrix.ldap_tag }} @@ -79,6 +84,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -151,6 +160,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up timeout-minutes: 5 run: | @@ -237,6 +250,10 @@ jobs: - tcp steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -318,6 +335,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 6f42730ea..5f98f6940 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -42,6 +42,11 @@ jobs: use-self-hosted: false steps: - uses: actions/checkout@v2 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up if: endsWith(github.repository, 'emqx') env: diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 6b0d0c4e9..13efa6369 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,8 +32,18 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045) + +- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group + when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094). + Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true + to prevent sessions from buffering messages, however this acknowledgement comes with a cost. + + ### Bug fixes +- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145) + - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) @@ -48,6 +58,26 @@ File format: Same `format_status` callback is added here too for `gen_server`s which hold password in their state. +- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). + - When discarding QoS 2 inflight messages, there were excessive logs + - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, + but not the subscribing topic), caused messages to be lost when dispatching. + +- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used. + Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect) + the message is still dispatched to it. + This issue only occurs when unsubscribe with the session kept. + Fixed in [#9119](https://github.com/emqx/emqx/pull/9119) + +- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all. + Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber + is hit (and then start sticking to it). + After this fix, it will start sticking to whichever randomly picked member even when it is a + subscriber from another node in the cluster. + Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) + +- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) + ## v4.3.20 ### Bug fixes diff --git a/CHANGES-4.4.md b/CHANGES-4.4.md index 16d57a37c..7228ad22b 100644 --- a/CHANGES-4.4.md +++ b/CHANGES-4.4.md @@ -1,8 +1,14 @@ # EMQX 4.4 Changes +## v4.4.11 + +### Bug fixes (synced from v4.3.22) + +### Enhancements (synced from v4.3.22) + ## v4.4.10 -### Bug fixes +### Bug fixes (synced from v4.3.21) - Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981) diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index 0eaa59daf..4e659293f 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -1 +1,20 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -define(APP, emqx_auth_http). + +%% equals to the default value of ehttpc +-define(DEFAULT_RETRY_TIMES, 2). diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index d4fd96a95..51bf9c303 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -24,7 +24,7 @@ -logger_header("[ACL http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -56,13 +56,15 @@ description() -> "ACL with HTTP API". %% Internal functions %%-------------------------------------------------------------------- -check_acl_request(#{pool_name := PoolName, +check_acl_request(ACLParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). access(subscribe) -> 1; access(publish) -> 2. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index e943317f8..87d087bae 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_http, [{description, "EMQ X Authentication/ACL with HTTP API"}, - {vsn, "4.3.7"}, % strict semver, bump manually! + {vsn, "4.3.8"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_http_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.appup.src b/apps/emqx_auth_http/src/emqx_auth_http.appup.src index f5c2bfe42..01d756b9e 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.appup.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.appup.src @@ -1,17 +1,27 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -20,21 +30,29 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}], - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -43,6 +61,5 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 98a897a8c..620750bd0 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -25,7 +25,7 @@ -logger_header("[Auth http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(#{pool_name := PoolName, +authenticate(AuthParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). is_superuser(undefined, _ClientInfo) -> false; -is_superuser(#{pool_name := PoolName, +is_superuser(SuperParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of + Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES), + case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index 3c7efd9c9..c747b778a 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -19,6 +19,7 @@ -include("emqx_auth_http.hrl"). -export([ request/6 + , request/7 , feedvar/2 , feedvar/3 ]). @@ -27,18 +28,21 @@ %% HTTP Request %%-------------------------------------------------------------------- -request(PoolName, get, Path, Headers, Params, Timeout) -> - NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), - reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout)); +request(PoolName, Method, Path, Headers, Params, Timeout) -> + request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES). -request(PoolName, post, Path, Headers, Params, Timeout) -> +request(PoolName, get, Path, Headers, Params, Timeout, Retry) -> + NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), + reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry)); + +request(PoolName, post, Path, Headers, Params, Timeout, Retry) -> Body = case proplists:get_value(<<"content-type">>, Headers) of "application/x-www-form-urlencoded" -> cow_qs:qs(bin_kw(Params)); "application/json" -> emqx_json:encode(bin_kw(Params)) end, - reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)). + reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)). reply({ok, StatusCode, _Headers}) -> {ok, StatusCode, <<>>}; diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 12753e62b..b02edb456 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.4.10"}, % strict semver, bump manually! + {vsn, "4.4.11"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 004545793..6b880df7a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,19 +1,24 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.9", + [{"4.4.10", + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.4.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -21,7 +26,8 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -30,7 +36,8 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -39,7 +46,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -97,19 +105,24 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.9", + [{"4.4.10", + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.4.9", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -117,7 +130,8 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -126,7 +140,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -135,7 +150,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 4a532d00b..70049d277 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -133,16 +133,15 @@ clear_metrics(Id) -> -spec(reset_metrics(rule_id()) -> ok). reset_metrics(Id) -> reset_speeds(Id), - reset_metrics(Id, rule_metrics()), + do_reset_metrics(Id, rule_metrics()), case emqx_rule_registry:get_rule(Id) of not_found -> ok; {ok, #rule{actions = Actions}} -> - [ reset_metrics(ActionId, action_metrics()) - || #action_instance{ id = ActionId} <- Actions], + reset_action_metrics(Actions), ok end. -reset_metrics(Id, Metrics) -> +do_reset_metrics(Id, Metrics) -> case couters_ref(Id) of not_found -> ok; Ref -> [counters:put(Ref, metrics_idx(Idx), 0) @@ -150,6 +149,12 @@ reset_metrics(Id, Metrics) -> ok end. +reset_action_metrics(Actions) -> + lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) -> + do_reset_metrics(ActionId, action_metrics()), + reset_action_metrics(FallbackActions) + end, Actions). + reset_speeds(Id) -> gen_server:call(?MODULE, {reset_speeds, Id}). @@ -330,6 +335,8 @@ handle_call({create_rule_metrics, Id}, _From, _ -> RuleSpeeds#{Id => #rule_speed{}} end}}; +handle_call({reset_speeds, _Id}, _From, State = #state{rule_speeds = undefined}) -> + {reply, ok, State}; handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) -> {reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}}; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index d819aa5e3..857b9017c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -50,6 +50,7 @@ groups() -> t_unregister_provider, t_create_rule, t_reset_metrics, + t_reset_metrics_fallbacks, t_create_resource ]}, {actions, [], @@ -381,18 +382,14 @@ t_inspect_action(_Config) -> t_reset_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), - {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( - #{type => built_in, - config => #{}, - description => <<"debug resource">>}), - {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( - #{rawsql => "select clientid as c, username as u " - "from \"t1\" ", - actions => [#{name => 'inspect', - args => #{'$resource' => ResId, a=>1, b=>2}}], - type => built_in, - description => <<"Inspect rule">> - }), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'inspect', args => #{a=>1, b=>2}}], + type => built_in, + description => <<"Inspect rule">> + }), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), [ begin @@ -400,16 +397,68 @@ t_reset_metrics(_Config) -> timer:sleep(100) end || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + ?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqx_rule_metrics:reset_metrics(Id), ?assertEqual(#{exception => 0,failed => 0, matched => 0,no_result => 0,passed => 0, speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, emqx_rule_metrics:get_rule_metrics(Id)), - ?assertEqual(#{failed => 0,success => 0,taken => 0}, - emqx_rule_metrics:get_action_metrics(ResId)), + ?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - emqx_rule_registry:remove_resource(ResId), + ok. + +t_reset_metrics_fallbacks(_Config) -> + ok = emqx_rule_engine:load_providers(), + ok = emqx_rule_registry:add_action( + #action{name = 'crash_action', app = ?APP, + module = ?MODULE, on_create = crash_action, + types=[], params_spec = #{}, + title = #{en => <<"Crash Action">>}, + description = #{en => <<"This action will always fail!">>}}), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [ + #action_instance{id = ActId1}, + #action_instance{id = ActId2} + ]}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [ + #{name => 'inspect', args => #{}, fallbacks => []}, + #{name => 'inspect', args => #{}, fallbacks => []} + ]}], + type => built_in, + description => <<"Inspect rule">> + }), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + [ begin + emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0), + timer:sleep(100) + end + || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertMatch(#{failed := 10, success := 0, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]], + [?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]], + emqx_rule_metrics:reset_metrics(Id), + ?assertEqual(#{exception => 0,failed => 0, + matched => 0,no_result => 0,passed => 0, + speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]], + emqtt:stop(Client), + emqx_rule_registry:remove_rule(Id), + ok = emqx_rule_registry:remove_action('crash_action'), ok. t_republish_action(_Config) -> diff --git a/bin/emqx b/bin/emqx index 01f2802cf..95840a1cb 100755 --- a/bin/emqx +++ b/bin/emqx @@ -525,7 +525,7 @@ esac if [ "$IS_BOOT_COMMAND" = 'no' ]; then # for non-boot commands, inspect vm.