diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index 6b05a014e..269b8bc65 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -24,6 +24,11 @@ jobs: steps: - uses: actions/checkout@v1 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up env: LDAP_TAG: ${{ matrix.ldap_tag }} @@ -79,6 +84,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -150,6 +159,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up timeout-minutes: 5 run: | @@ -236,6 +249,10 @@ jobs: - tcp steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ @@ -317,6 +334,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker-compose up run: | docker-compose \ diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 2fdad18d4..5f83049e5 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -42,6 +42,11 @@ jobs: use-self-hosted: false steps: - uses: actions/checkout@v2 + # to avoid dirty self-hosted runners + - name: stop containers + run: | + docker rm -f $(docker ps -qa) || true + docker network rm $(docker network ls -q) || true - name: docker compose up if: endsWith(github.repository, 'emqx') env: diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 6b0d0c4e9..733c51012 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,6 +32,14 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045) + +- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group + when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094). + Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true + to prevent sessions from buffering messages, however this acknowledgement comes with a cost. + + ### Bug fixes - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) @@ -48,6 +56,26 @@ File format: Same `format_status` callback is added here too for `gen_server`s which hold password in their state. +- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). + - When discarding QoS 2 inflight messages, there were excessive logs + - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, + but not the subscribing topic), caused messages to be lost when dispatching. + +- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used. + Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect) + the message is still dispatched to it. + This issue only occurs when unsubscribe with the session kept. + Fixed in [#9119](https://github.com/emqx/emqx/pull/9119) + +- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all. + Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber + is hit (and then start sticking to it). + After this fix, it will start sticking to whichever randomly picked member even when it is a + subscriber from another node in the cluster. + Fixed in [#9122](https://github.com/emqx/emqx/pull/9122) + +- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125) + ## v4.3.20 ### Bug fixes diff --git a/apps/emqx_auth_http/include/emqx_auth_http.hrl b/apps/emqx_auth_http/include/emqx_auth_http.hrl index 0eaa59daf..4e659293f 100644 --- a/apps/emqx_auth_http/include/emqx_auth_http.hrl +++ b/apps/emqx_auth_http/include/emqx_auth_http.hrl @@ -1 +1,20 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -define(APP, emqx_auth_http). + +%% equals to the default value of ehttpc +-define(DEFAULT_RETRY_TIMES, 2). diff --git a/apps/emqx_auth_http/src/emqx_acl_http.erl b/apps/emqx_auth_http/src/emqx_acl_http.erl index d4fd96a95..51bf9c303 100644 --- a/apps/emqx_auth_http/src/emqx_acl_http.erl +++ b/apps/emqx_auth_http/src/emqx_acl_http.erl @@ -24,7 +24,7 @@ -logger_header("[ACL http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -56,13 +56,15 @@ description() -> "ACL with HTTP API". %% Internal functions %%-------------------------------------------------------------------- -check_acl_request(#{pool_name := PoolName, +check_acl_request(ACLParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). access(subscribe) -> 1; access(publish) -> 2. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.app.src b/apps/emqx_auth_http/src/emqx_auth_http.app.src index e943317f8..87d087bae 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.app.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_http, [{description, "EMQ X Authentication/ACL with HTTP API"}, - {vsn, "4.3.7"}, % strict semver, bump manually! + {vsn, "4.3.8"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_http_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_auth_http/src/emqx_auth_http.appup.src b/apps/emqx_auth_http/src/emqx_auth_http.appup.src index f5c2bfe42..01d756b9e 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.appup.src +++ b/apps/emqx_auth_http/src/emqx_auth_http.appup.src @@ -1,17 +1,27 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -20,21 +30,29 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}], - [{"4.3.6", - [ %% There are only changes to the schema file, so we don't need any - %% commands here - ]}, + [{"4.3.7", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, + {"4.3.6", + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -43,6 +61,5 @@ {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, - [{restart_application,emqx_auth_http}]}, + {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_auth_http/src/emqx_auth_http.erl b/apps/emqx_auth_http/src/emqx_auth_http.erl index 98a897a8c..620750bd0 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http.erl @@ -25,7 +25,7 @@ -logger_header("[Auth http]"). -import(emqx_auth_http_cli, - [ request/6 + [ request/7 , feedvar/2 ]). @@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API". %% Requests %%-------------------------------------------------------------------- -authenticate(#{pool_name := PoolName, +authenticate(AuthParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). + Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES), + request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). is_superuser(undefined, _ClientInfo) -> false; -is_superuser(#{pool_name := PoolName, +is_superuser(SuperParams = + #{pool_name := PoolName, path := Path, method := Method, headers := Headers, params := Params, timeout := Timeout}, ClientInfo) -> - case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of + Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES), + case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of {ok, 200, _Body} -> true; {ok, _Code, _Body} -> false; {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), diff --git a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl index 3c7efd9c9..c747b778a 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_cli.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_cli.erl @@ -19,6 +19,7 @@ -include("emqx_auth_http.hrl"). -export([ request/6 + , request/7 , feedvar/2 , feedvar/3 ]). @@ -27,18 +28,21 @@ %% HTTP Request %%-------------------------------------------------------------------- -request(PoolName, get, Path, Headers, Params, Timeout) -> - NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), - reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout)); +request(PoolName, Method, Path, Headers, Params, Timeout) -> + request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES). -request(PoolName, post, Path, Headers, Params, Timeout) -> +request(PoolName, get, Path, Headers, Params, Timeout, Retry) -> + NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), + reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry)); + +request(PoolName, post, Path, Headers, Params, Timeout, Retry) -> Body = case proplists:get_value(<<"content-type">>, Headers) of "application/x-www-form-urlencoded" -> cow_qs:qs(bin_kw(Params)); "application/json" -> emqx_json:encode(bin_kw(Params)) end, - reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)). + reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)). reply({ok, StatusCode, _Headers}) -> {ok, StatusCode, <<>>}; diff --git a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl index f7071bc17..8529fb143 100644 --- a/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_auth_mnesia_SUITE.erl @@ -408,7 +408,7 @@ t_password_hash(_) -> ok = application:start(emqx_auth_mnesia). t_will_message_connection_denied(Config) when is_list(Config) -> - ClientId = Username = <<"subscriber">>, + ClientId = <<"subscriber">>, Password = <<"p">>, application:stop(emqx_auth_mnesia), ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 004545793..5c359a199 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,15 +5,18 @@ [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -21,7 +24,8 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -30,7 +34,8 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -39,7 +44,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -101,15 +107,18 @@ [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.[6-7]">>, - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -117,7 +126,8 @@ {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.5", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -126,7 +136,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, @@ -135,7 +146,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 4a532d00b..e32730b1d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -133,16 +133,15 @@ clear_metrics(Id) -> -spec(reset_metrics(rule_id()) -> ok). reset_metrics(Id) -> reset_speeds(Id), - reset_metrics(Id, rule_metrics()), + do_reset_metrics(Id, rule_metrics()), case emqx_rule_registry:get_rule(Id) of not_found -> ok; {ok, #rule{actions = Actions}} -> - [ reset_metrics(ActionId, action_metrics()) - || #action_instance{ id = ActionId} <- Actions], + reset_action_metrics(Actions), ok end. -reset_metrics(Id, Metrics) -> +do_reset_metrics(Id, Metrics) -> case couters_ref(Id) of not_found -> ok; Ref -> [counters:put(Ref, metrics_idx(Idx), 0) @@ -150,6 +149,12 @@ reset_metrics(Id, Metrics) -> ok end. +reset_action_metrics(Actions) -> + lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) -> + do_reset_metrics(ActionId, action_metrics()), + reset_action_metrics(FallbackActions) + end, Actions). + reset_speeds(Id) -> gen_server:call(?MODULE, {reset_speeds, Id}). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index d819aa5e3..857b9017c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -50,6 +50,7 @@ groups() -> t_unregister_provider, t_create_rule, t_reset_metrics, + t_reset_metrics_fallbacks, t_create_resource ]}, {actions, [], @@ -381,18 +382,14 @@ t_inspect_action(_Config) -> t_reset_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), - {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( - #{type => built_in, - config => #{}, - description => <<"debug resource">>}), - {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( - #{rawsql => "select clientid as c, username as u " - "from \"t1\" ", - actions => [#{name => 'inspect', - args => #{'$resource' => ResId, a=>1, b=>2}}], - type => built_in, - description => <<"Inspect rule">> - }), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'inspect', args => #{a=>1, b=>2}}], + type => built_in, + description => <<"Inspect rule">> + }), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), [ begin @@ -400,16 +397,68 @@ t_reset_metrics(_Config) -> timer:sleep(100) end || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + ?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqx_rule_metrics:reset_metrics(Id), ?assertEqual(#{exception => 0,failed => 0, matched => 0,no_result => 0,passed => 0, speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, emqx_rule_metrics:get_rule_metrics(Id)), - ?assertEqual(#{failed => 0,success => 0,taken => 0}, - emqx_rule_metrics:get_action_metrics(ResId)), + ?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(ActId0)), emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - emqx_rule_registry:remove_resource(ResId), + ok. + +t_reset_metrics_fallbacks(_Config) -> + ok = emqx_rule_engine:load_providers(), + ok = emqx_rule_registry:add_action( + #action{name = 'crash_action', app = ?APP, + module = ?MODULE, on_create = crash_action, + types=[], params_spec = #{}, + title = #{en => <<"Crash Action">>}, + description = #{en => <<"This action will always fail!">>}}), + {ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [ + #action_instance{id = ActId1}, + #action_instance{id = ActId2} + ]}]}} = + emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [ + #{name => 'inspect', args => #{}, fallbacks => []}, + #{name => 'inspect', args => #{}, fallbacks => []} + ]}], + type => built_in, + description => <<"Inspect rule">> + }), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + [ begin + emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0), + timer:sleep(100) + end + || _ <- lists:seq(1,10)], + ?assertMatch(#{exception := 0, failed := 0, + matched := 10, no_result := 0, passed := 10}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertMatch(#{failed := 10, success := 0, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]], + [?assertMatch(#{failed := 0, success := 10, taken := 10}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]], + emqx_rule_metrics:reset_metrics(Id), + ?assertEqual(#{exception => 0,failed => 0, + matched => 0,no_result => 0,passed => 0, + speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, + emqx_rule_metrics:get_rule_metrics(Id)), + [?assertEqual(#{failed => 0, success => 0, taken => 0}, + emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]], + emqtt:stop(Client), + emqx_rule_registry:remove_rule(Id), + ok = emqx_rule_registry:remove_action('crash_action'), ok. t_republish_action(_Config) -> diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 62b0b9042..cbe386ba0 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.9", - [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {add_module,emqx_secret}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -271,7 +272,8 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.9", - [{load_module,emqx_misc,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 51674263c..0d03c39cf 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -312,6 +312,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> fun enrich_client/2, fun set_log_meta/2, fun check_banned/2, + fun count_flapping_event/2, fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> @@ -1060,11 +1061,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(Reason, Channel); -handle_info({sock_closed, Reason}, Channel = - #channel{conn_state = connected, - clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_zone:enable_flapping_detect(Zone) - andalso emqx_flapping:detect(ClientInfo), +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) -> Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; @@ -1373,6 +1370,13 @@ auth_connect(#mqtt_packet_connect{password = Password}, {error, emqx_reason_codes:connack_error(Reason)} end. +%%-------------------------------------------------------------------- +%% Flapping + +count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> + _ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Enhanced Authentication diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 217e26bb0..7765f7d7a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -683,26 +683,35 @@ run_terminate_hooks(ClientInfo, takeovered, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(#session{inflight = Inflight}) -> - InflightList = emqx_inflight:to_list(Inflight), - lists:foreach(fun - %% Only QoS1 messages get redispatched, because QoS2 messages - %% must be sent to the same client, once they're in flight - ({_, {#message{qos = ?QOS_2} = Msg, _}}) -> - ?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); - ({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> - case emqx_shared_sub:get_group(Msg) of - {ok, Group} -> - %% Note that dispatch is called with self() in failed subs - %% This is done to avoid dispatching back to caller - Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); - _ -> - false - end; - (_) -> - ok - end, InflightList). +redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), + F = fun({_, {Msg, _Ts}}) -> + case Msg of + #message{qos = ?QOS_1} -> + %% For QoS 2, here is what the spec says: + %% If the Client's Session terminates before the Client reconnects, + %% the Server MUST NOT send the Application Message to any other + %% subscribed Client [MQTT-4.8.2-5]. + {true, Msg}; + _ -> + %% QoS 2, after pubrec is received + %% the inflight record is updated to an atom + false + end + end, + InflightList = lists:filtermap(F, AllInflights), + MqList = mqueue_to_list(Q, []), + emqx_shared_sub:redispatch(InflightList ++ MqList). + +%% convert mqueue to a list +%% the messages at the head of the list is to be dispatched before the tail +mqueue_to_list(Q, Acc) -> + case emqx_mqueue:out(Q) of + {empty, _Q} -> + lists:reverse(Acc); + {{value, Msg}, Q1} -> + mqueue_to_list(Q1, [Msg | Acc]) + end. -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index cc57e001f..9c051c62a 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,7 +39,7 @@ ]). -export([ dispatch/3 - , dispatch_to_non_self/3 + , redispatch/1 ]). -export([ maybe_ack/1 @@ -47,7 +47,6 @@ , nack_no_connection/1 , is_ack_required/1 , is_retry_dispatch/1 - , get_group/1 ]). %% for testing @@ -84,6 +83,9 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). + +-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). -record(state, {pmon}). @@ -134,11 +136,12 @@ dispatch(Group, Topic, Delivery) -> Strategy = strategy(Group), dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg, +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg0, case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> + Msg = with_redispatch_to(Msg0, Group, Topic), case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -162,7 +165,7 @@ ack_enabled() -> emqx:get_env(shared_dispatch_ack_enabled, false). do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> - %% Deadlock otherwise + %% dispatch without ack, deadlock otherwise send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> @@ -176,6 +179,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) -> send(SubPid, Topic, {deliver, Topic, Msg}) end. +with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg; +with_redispatch_to(Msg, Group, Topic) -> + emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). + dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), @@ -228,13 +235,22 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). --spec(get_group(emqx_types:message()) -> {ok, any()} | error). -get_group(Msg) -> - case get_group_ack(Msg) of - {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; - _ -> error +%% @hidden Redispatch is neede only for the messages with redispatch_to header added. +is_redispatch_needed(#message{} = Msg) -> + case get_redispatch_to(Msg) of + ?REDISPATCH_TO(_, _) -> + true; + _ -> + false end. +%% @hidden Return the `redispatch_to` group-topic in the message header. +%% `false` is returned if the message is not a shared dispatch. +%% or when it's a QoS 0 message. +-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false). +get_redispatch_to(Msg) -> + emqx_message:get_header(redispatch_to, Msg, false). + -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). @@ -245,6 +261,26 @@ is_retry_dispatch(Msg) -> _ -> false end. +%% @doc Redispatch shared deliveries to other members in the group. +redispatch(Messages0) -> + Messages = lists:filter(fun is_redispatch_needed/1, Messages0), + case length(Messages) of + L when L > 0 -> + ?LOG(info, "Redispatching ~p shared subscription message(s)", [L]), + lists:foreach(fun redispatch_shared_message/1, Messages); + _ -> + ok + end. + +redispatch_shared_message(#message{} = Msg) -> + %% As long as it's still a #message{} record in inflight, + %% we should try to re-dispatch + ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg), + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + dispatch_to_non_self(Group, Topic, Delivery). + %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> @@ -301,7 +337,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - case is_active_sub(Sub0, FailedSubs) of + All = subscribers(Group, Topic), + case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive %% keep using it for sticky strategy @@ -419,6 +456,7 @@ handle_cast(Msg, State) -> handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> #emqx_shared_subscription{subpid = SubPid} = NewRecord, + ok = maybe_insert_alive_tab(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until @@ -471,8 +509,10 @@ update_stats(State) -> State. %% Return 'true' if the subscriber process is alive AND not in the failed list -is_active_sub(Pid, FailedSubs) -> - not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). +is_active_sub(Pid, FailedSubs, All) -> + lists:member(Pid, All) andalso + (not maps:is_key(Pid, FailedSubs)) andalso + is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 06e5e4eaa..e887d35f4 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -35,11 +35,7 @@ init_per_suite(Config) -> ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), - %% Access Control Meck - ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), - ok = meck:expect(emqx_access_control, authenticate, - fun(_) -> {ok, #{auth_result => success}} end), - ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), + %% Broker Meck ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), %% Hooks Meck @@ -55,8 +51,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - meck:unload([emqx_access_control, - emqx_metrics, + meck:unload([emqx_metrics, emqx_session, emqx_broker, emqx_hooks, @@ -65,10 +60,16 @@ end_per_suite(_Config) -> init_per_testcase(_TestCase, Config) -> meck:new(emqx_zone, [passthrough, no_history, no_link]), + %% Access Control Meck + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {ok, #{auth_result => success}} end), + ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end), Config. end_per_testcase(_TestCase, Config) -> meck:unload([emqx_zone]), + meck:unload([emqx_access_control]), Config. %%-------------------------------------------------------------------- @@ -855,6 +856,30 @@ t_ws_cookie_init(_) -> Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). +%%-------------------------------------------------------------------- +%% Test cases for other mechnisms +%%-------------------------------------------------------------------- + +t_flapping_detect(_) -> + Parent = self(), + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end), + ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end), + ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end), + IdleChannel = channel(#{conn_state => idle}), + {shutdown, not_authorized, _ConnAck, _Channel} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel), + receive + flapping_detect -> ok + after 2000 -> + ?assert(false, "Flapping detect should be exected in connecting progress") + end, + meck:unload([emqx_flapping]). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index eadd89192..8074a8607 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -72,4 +72,4 @@ t_expired_detecting(_) -> (_) -> false end, ets:tab2list(emqx_flapping))), timer:sleep(200), ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; - (_) -> true end, ets:tab2list(emqx_flapping))). \ No newline at end of file + (_) -> true end, ets:tab2list(emqx_flapping))). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 9ffce523d..2c4ecf265 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -25,13 +25,24 @@ -define(SUITE, ?MODULE). --define(wait(For, Timeout), - emqx_ct_helpers:wait_for( - ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). - -define(ack, shared_sub_ack). -define(no_ack, no_ack). +-define(WAIT(TIMEOUT, PATTERN, Res), + (fun() -> + receive + PATTERN -> + Res; + Other -> + ct:fail(#{expected => ??PATTERN, + got => Other + }) + after + TIMEOUT -> + ct:fail({timeout, ??PATTERN}) + end + end)()). + all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> @@ -151,40 +162,7 @@ t_no_connection_nack(Config) when is_list(Config) -> SendF(1), ct:sleep(200), %% This is the connection which was picked by broker to dispatch (sticky) for 1st message - ?assertMatch([#{packet_id := 1}], recv_msgs(1)), - %% Now kill the connection, expect all following messages to be delivered to the other - %% subscriber. - %emqx_mock_client:stop(ConnPid), - %% sleep then make synced calls to session processes to ensure that - %% the connection pid's 'EXIT' message is propagated to the session process - %% also to be sure sessions are still alive - % timer:sleep(2), - % _ = emqx_session:info(SPid1), - % _ = emqx_session:info(SPid2), - % %% Now we know what is the other still alive connection - % [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid], - % %% Send some more messages - % PacketIdList = lists:seq(2, 10), - % lists:foreach(fun(Id) -> - % SendF(Id), - % ?wait(Received(Id, TheOtherConnPid), 1000) - % end, PacketIdList), - % %% Now close the 2nd (last connection) - % emqx_mock_client:stop(TheOtherConnPid), - % timer:sleep(2), - % %% both sessions should have conn_pid = undefined - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), - % ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), - % %% send more messages, but all should be queued in session state - % lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), - % {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), - % {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), - % ?assertEqual(length(PacketIdList), L1 + L2), - % %% clean up - % emqx_mock_client:close_session(PubConnPid), - % emqx_sm:close_session(SPid1), - % emqx_sm:close_session(SPid2), ok. t_random(Config) when is_list(Config) -> @@ -443,8 +421,14 @@ t_local_fallback(Config) when is_list(Config) -> %% This one tests that broker tries to select another shared subscriber %% If the first one doesn't return an ACK -t_redispatch(Config) when is_list(Config) -> - ok = ensure_config(sticky, true), +t_redispatch_with_ack(Config) when is_list(Config) -> + test_redispatch(Config, true). + +t_redispatch_no_ack(Config) when is_list(Config) -> + test_redispatch(Config, false). + +test_redispatch(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), application:set_env(emqx, shared_dispatch_ack_enabled, true), Group = <<"group1">>, @@ -474,15 +458,59 @@ t_redispatch(Config) when is_list(Config) -> emqtt:stop(UsedSubPid2), ok. +t_redispatch_wildcard_with_ack(Config) when is_list(Config)-> + redispatch_wildcard(Config, true). + +t_redispatch_wildcard_no_ack(Config) when is_list(Config) -> + redispatch_wildcard(Config, false). + +%% This one tests that broker tries to redispatch to another member in the group +%% if the first one disconnected before acking (auto_ack set to false) +redispatch_wildcard(_Config, AckEnabled) -> + ok = ensure_config(sticky, AckEnabled), + + Group = <<"group1">>, + + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + +t_dispatch_when_inflights_are_full({init, Config}) -> + %% make sure broker does not push more than one inflight + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + Config; +t_dispatch_when_inflights_are_full({'end', _Config}) -> + meck:unload(emqx_zone); t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> - ok = ensure_config(round_robin, true), + ok = ensure_config(round_robin, _AckEnabled = true), Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, - %% Note that max_inflight is 1 - {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), - {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), @@ -505,8 +533,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), %% Now kill any client - erlang:exit(ConnPid1, normal), - ct:sleep(100), + ok = kill_process(ConnPid1), %% And try to send the message ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), @@ -521,10 +548,137 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> emqtt:stop(ConnPid2), ok. +%% No ack, QoS 2 subscriptions, +%% client1 receives one message, send pubrec, then suspend +%% client2 acts normal (auto_ack=true) +%% Expected behaviour: +%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down +t_dispatch_qos2({init, Config}) when is_list(Config) -> + meck:new(emqx_zone, [passthrough, no_history]), + meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end), + Config; +t_dispatch_qos2({'end', Config}) when is_list(Config) -> + meck:unload(emqx_zone); +t_dispatch_qos2(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + %% One message is inflight + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + sys:resume(ConnPid1), + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false + %% so it will never send PUBCOMP, hence EMQX should not attempt to send + %% the 4th message yet since max_inflight is 1. + MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3), + ct:sleep(100), + %% no message expected + ?assertEqual([], collect_msgs(0)), + %% now kill client 1 + kill_process(ConnPid1), + %% client 2 should receive the message + MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec4 > MsgRec3), + emqtt:stop(ConnPid2), + ok. + +t_dispatch_qos0({init, Config}) when is_list(Config) -> + Config; +t_dispatch_qos0({'end', Config}) when is_list(Config) -> + ok; +t_dispatch_qos0(Config) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), + Topic = <<"foo/bar/1">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + %% subscribe with QoS 0 + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}), + + %% publish with QoS 2, but should be downgraded to 0 as the subscribers + %% subscribe with QoS 0 + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + ok = sys:suspend(ConnPid1), + + ?assertMatch([_], emqx:publish(Message1)), + ?assertMatch([_], emqx:publish(Message2)), + ?assertMatch([_], emqx:publish(Message3)), + ?assertMatch([_], emqx:publish(Message4)), + + MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), + MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + %% assert hello2 > hello1 or hello4 > hello3 + ?assert(MsgRec2 > MsgRec1), + + kill_process(ConnPid1), + %% expect no redispatch + ?assertEqual([], collect_msgs(timer:seconds(2))), + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- +kill_process(Pid) -> + _ = unlink(Pid), + _ = monitor(process, Pid), + erlang:exit(Pid, kill), + receive + {'DOWN', _, process, Pid, _} -> + ok + end. + +collect_msgs(Timeout) -> + collect_msgs([], Timeout). + +collect_msgs(Acc, Timeout) -> + receive + Msg -> + collect_msgs([Msg | Acc], Timeout) + after + Timeout -> + lists:reverse(Acc) + end. + ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true).