Merge pull request #9130 from HJianBo/merge-rel-v43-into-v44

Merge release-v43 into release-v44
This commit is contained in:
JianBo He 2022-10-11 11:05:49 +08:00 committed by GitHub
commit 58dc0a8913
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 553 additions and 153 deletions

View File

@ -24,6 +24,11 @@ jobs:
steps:
- uses: actions/checkout@v1
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up
env:
LDAP_TAG: ${{ matrix.ldap_tag }}
@ -79,6 +84,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up
run: |
docker-compose \
@ -150,6 +159,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up
timeout-minutes: 5
run: |
@ -236,6 +249,10 @@ jobs:
- tcp
steps:
- uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up
run: |
docker-compose \
@ -317,6 +334,10 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up
run: |
docker-compose \

View File

@ -42,6 +42,11 @@ jobs:
use-self-hosted: false
steps:
- uses: actions/checkout@v2
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up
if: endsWith(github.repository, 'emqx')
env:

View File

@ -32,6 +32,14 @@ File format:
- Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045)
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
### Bug fixes
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
@ -48,6 +56,26 @@ File format:
Same `format_status` callback is added here too for `gen_server`s which hold password in
their state.
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
- When discarding QoS 2 inflight messages, there were excessive logs
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
but not the subscribing topic), caused messages to be lost when dispatching.
- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used.
Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect)
the message is still dispatched to it.
This issue only occurs when unsubscribe with the session kept.
Fixed in [#9119](https://github.com/emqx/emqx/pull/9119)
- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all.
Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber
is hit (and then start sticking to it).
After this fix, it will start sticking to whichever randomly picked member even when it is a
subscriber from another node in the cluster.
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125)
## v4.3.20
### Bug fixes

View File

@ -1 +1,20 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(APP, emqx_auth_http).
%% equals to the default value of ehttpc
-define(DEFAULT_RETRY_TIMES, 2).

View File

@ -24,7 +24,7 @@
-logger_header("[ACL http]").
-import(emqx_auth_http_cli,
[ request/6
[ request/7
, feedvar/2
]).
@ -56,13 +56,15 @@ description() -> "ACL with HTTP API".
%% Internal functions
%%--------------------------------------------------------------------
check_acl_request(#{pool_name := PoolName,
check_acl_request(ACLParams =
#{pool_name := PoolName,
path := Path,
method := Method,
headers := Headers,
params := Params,
timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout).
Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
access(subscribe) -> 1;
access(publish) -> 2.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_http,
[{description, "EMQ X Authentication/ACL with HTTP API"},
{vsn, "4.3.7"}, % strict semver, bump manually!
{vsn, "4.3.8"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -1,17 +1,27 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.6",
[ %% There are only changes to the schema file, so we don't need any
%% commands here
]},
[{"4.3.7",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2",
@ -20,21 +30,29 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>,
[{restart_application,emqx_auth_http}]},
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}],
[{"4.3.6",
[ %% There are only changes to the schema file, so we don't need any
%% commands here
]},
[{"4.3.7",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2",
@ -43,6 +61,5 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>,
[{restart_application,emqx_auth_http}]},
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}]}.

View File

@ -25,7 +25,7 @@
-logger_header("[Auth http]").
-import(emqx_auth_http_cli,
[ request/6
[ request/7
, feedvar/2
]).
@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API".
%% Requests
%%--------------------------------------------------------------------
authenticate(#{pool_name := PoolName,
authenticate(AuthParams =
#{pool_name := PoolName,
path := Path,
method := Method,
headers := Headers,
params := Params,
timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout).
Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()).
is_superuser(undefined, _ClientInfo) ->
false;
is_superuser(#{pool_name := PoolName,
is_superuser(SuperParams =
#{pool_name := PoolName,
path := Path,
method := Method,
headers := Headers,
params := Params,
timeout := Timeout}, ClientInfo) ->
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
{ok, 200, _Body} -> true;
{ok, _Code, _Body} -> false;
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]),

View File

@ -19,6 +19,7 @@
-include("emqx_auth_http.hrl").
-export([ request/6
, request/7
, feedvar/2
, feedvar/3
]).
@ -27,18 +28,21 @@
%% HTTP Request
%%--------------------------------------------------------------------
request(PoolName, get, Path, Headers, Params, Timeout) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout));
request(PoolName, Method, Path, Headers, Params, Timeout) ->
request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES).
request(PoolName, post, Path, Headers, Params, Timeout) ->
request(PoolName, get, Path, Headers, Params, Timeout, Retry) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry));
request(PoolName, post, Path, Headers, Params, Timeout, Retry) ->
Body = case proplists:get_value(<<"content-type">>, Headers) of
"application/x-www-form-urlencoded" ->
cow_qs:qs(bin_kw(Params));
"application/json" ->
emqx_json:encode(bin_kw(Params))
end,
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)).
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)).
reply({ok, StatusCode, _Headers}) ->
{ok, StatusCode, <<>>};

View File

@ -408,7 +408,7 @@ t_password_hash(_) ->
ok = application:start(emqx_auth_mnesia).
t_will_message_connection_denied(Config) when is_list(Config) ->
ClientId = Username = <<"subscriber">>,
ClientId = <<"subscriber">>,
Password = <<"p">>,
application:stop(emqx_auth_mnesia),
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]),

View File

@ -5,15 +5,18 @@
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -21,7 +24,8 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -30,7 +34,8 @@
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -39,7 +44,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -101,15 +107,18 @@
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -117,7 +126,8 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -126,7 +136,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -135,7 +146,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},

View File

@ -133,16 +133,15 @@ clear_metrics(Id) ->
-spec(reset_metrics(rule_id()) -> ok).
reset_metrics(Id) ->
reset_speeds(Id),
reset_metrics(Id, rule_metrics()),
do_reset_metrics(Id, rule_metrics()),
case emqx_rule_registry:get_rule(Id) of
not_found -> ok;
{ok, #rule{actions = Actions}} ->
[ reset_metrics(ActionId, action_metrics())
|| #action_instance{ id = ActionId} <- Actions],
reset_action_metrics(Actions),
ok
end.
reset_metrics(Id, Metrics) ->
do_reset_metrics(Id, Metrics) ->
case couters_ref(Id) of
not_found -> ok;
Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
@ -150,6 +149,12 @@ reset_metrics(Id, Metrics) ->
ok
end.
reset_action_metrics(Actions) ->
lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) ->
do_reset_metrics(ActionId, action_metrics()),
reset_action_metrics(FallbackActions)
end, Actions).
reset_speeds(Id) ->
gen_server:call(?MODULE, {reset_speeds, Id}).

View File

@ -50,6 +50,7 @@ groups() ->
t_unregister_provider,
t_create_rule,
t_reset_metrics,
t_reset_metrics_fallbacks,
t_create_resource
]},
{actions, [],
@ -381,15 +382,11 @@ t_inspect_action(_Config) ->
t_reset_metrics(_Config) ->
ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => built_in,
config => #{},
description => <<"debug resource">>}),
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} =
emqx_rule_engine:create_rule(
#{rawsql => "select clientid as c, username as u "
"from \"t1\" ",
actions => [#{name => 'inspect',
args => #{'$resource' => ResId, a=>1, b=>2}}],
actions => [#{name => 'inspect', args => #{a=>1, b=>2}}],
type => built_in,
description => <<"Inspect rule">>
}),
@ -400,16 +397,68 @@ t_reset_metrics(_Config) ->
timer:sleep(100)
end
|| _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(ActId0)),
emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)),
?assertEqual(#{failed => 0,success => 0,taken => 0},
emqx_rule_metrics:get_action_metrics(ResId)),
?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(ActId0)),
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId),
ok.
t_reset_metrics_fallbacks(_Config) ->
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_registry:add_action(
#action{name = 'crash_action', app = ?APP,
module = ?MODULE, on_create = crash_action,
types=[], params_spec = #{},
title = #{en => <<"Crash Action">>},
description = #{en => <<"This action will always fail!">>}}),
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [
#action_instance{id = ActId1},
#action_instance{id = ActId2}
]}]}} =
emqx_rule_engine:create_rule(
#{rawsql => "select clientid as c, username as u "
"from \"t1\" ",
actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [
#{name => 'inspect', args => #{}, fallbacks => []},
#{name => 'inspect', args => #{}, fallbacks => []}
]}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
[ begin
emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0),
timer:sleep(100)
end
|| _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertMatch(#{failed := 10, success := 0, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]],
[?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]],
emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]],
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
ok = emqx_rule_registry:remove_action('crash_action'),
ok.
t_republish_action(_Config) ->

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -271,7 +272,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},

View File

@ -312,6 +312,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
fun enrich_client/2,
fun set_log_meta/2,
fun check_banned/2,
fun count_flapping_event/2,
fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
@ -1060,11 +1061,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel =
#channel{conn_state = connected,
clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo),
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) ->
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
@ -1373,6 +1370,13 @@ auth_connect(#mqtt_packet_connect{password = Password},
{error, emqx_reason_codes:connack_error(Reason)}
end.
%%--------------------------------------------------------------------
%% Flapping
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
_ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Enhanced Authentication

View File

@ -683,26 +683,35 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight}) ->
InflightList = emqx_inflight:to_list(Inflight),
lists:foreach(fun
%% Only QoS1 messages get redispatched, because QoS2 messages
%% must be sent to the same client, once they're in flight
({_, {#message{qos = ?QOS_2} = Msg, _}}) ->
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]);
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) ->
case emqx_shared_sub:get_group(Msg) of
{ok, Group} ->
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) ->
case Msg of
#message{qos = ?QOS_1} ->
%% For QoS 2, here is what the spec says:
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
_ ->
%% QoS 2, after pubrec is received
%% the inflight record is updated to an atom
false
end;
(_) ->
ok
end, InflightList).
end
end,
InflightList = lists:filtermap(F, AllInflights),
MqList = mqueue_to_list(Q, []),
emqx_shared_sub:redispatch(InflightList ++ MqList).
%% convert mqueue to a list
%% the messages at the head of the list is to be dispatched before the tail
mqueue_to_list(Q, Acc) ->
case emqx_mqueue:out(Q) of
{empty, _Q} ->
lists:reverse(Acc);
{{value, Msg}, Q1} ->
mqueue_to_list(Q1, [Msg | Acc])
end.
-compile({inline, [run_hook/2]}).
run_hook(Name, Args) ->

View File

@ -39,7 +39,7 @@
]).
-export([ dispatch/3
, dispatch_to_non_self/3
, redispatch/1
]).
-export([ maybe_ack/1
@ -47,7 +47,6 @@
, nack_no_connection/1
, is_ack_required/1
, is_retry_dispatch/1
, get_group/1
]).
%% for testing
@ -84,6 +83,9 @@
-define(ACK, shared_sub_ack).
-define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack).
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
-record(state, {pmon}).
@ -134,11 +136,12 @@ dispatch(Group, Topic, Delivery) ->
Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg,
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg0,
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
false -> {error, no_subscribers};
{Type, SubPid} ->
Msg = with_redispatch_to(Msg0, Group, Topic),
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> {ok, 1};
{error, Reason} ->
@ -162,7 +165,7 @@ ack_enabled() ->
emqx:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise
%% dispatch without ack, deadlock otherwise
send(SubPid, Topic, {deliver, Topic, Msg});
%% return either 'ok' (when everything is fine) or 'error'
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
@ -176,6 +179,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) ->
send(SubPid, Topic, {deliver, Topic, Msg})
end.
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg;
with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid),
@ -228,13 +235,22 @@ without_group_ack(Msg) ->
get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
get_group(Msg) ->
case get_group_ack(Msg) of
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
_ -> error
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
is_redispatch_needed(#message{} = Msg) ->
case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) ->
true;
_ ->
false
end.
%% @hidden Return the `redispatch_to` group-topic in the message header.
%% `false` is returned if the message is not a shared dispatch.
%% or when it's a QoS 0 message.
-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false).
get_redispatch_to(Msg) ->
emqx_message:get_header(redispatch_to, Msg, false).
-spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
@ -245,6 +261,26 @@ is_retry_dispatch(Msg) ->
_ -> false
end.
%% @doc Redispatch shared deliveries to other members in the group.
redispatch(Messages0) ->
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
case length(Messages) of
L when L > 0 ->
?LOG(info, "Redispatching ~p shared subscription message(s)", [L]),
lists:foreach(fun redispatch_shared_message/1, Messages);
_ ->
ok
end.
redispatch_shared_message(#message{} = Msg) ->
%% As long as it's still a #message{} record in inflight,
%% we should try to re-dispatch
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
dispatch_to_non_self(Group, Topic, Delivery).
%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) ->
@ -301,7 +337,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of
All = subscribers(Group, Topic),
case is_active_sub(Sub0, FailedSubs, All) of
true ->
%% the old subscriber is still alive
%% keep using it for sticky strategy
@ -419,6 +456,7 @@ handle_cast(Msg, State) ->
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
ok = maybe_insert_alive_tab(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
@ -471,8 +509,10 @@ update_stats(State) ->
State.
%% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) ->
not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid).
is_active_sub(Pid, FailedSubs, All) ->
lists:member(Pid, All) andalso
(not maps:is_key(Pid, FailedSubs)) andalso
is_alive_sub(Pid).
%% erlang:is_process_alive/1 does not work with remote pid.
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->

View File

@ -35,11 +35,7 @@ init_per_suite(Config) ->
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
%% Broker Meck
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
%% Hooks Meck
@ -55,8 +51,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
meck:unload([emqx_access_control,
emqx_metrics,
meck:unload([emqx_metrics,
emqx_session,
emqx_broker,
emqx_hooks,
@ -65,10 +60,16 @@ end_per_suite(_Config) ->
init_per_testcase(_TestCase, Config) ->
meck:new(emqx_zone, [passthrough, no_history, no_link]),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
Config.
end_per_testcase(_TestCase, Config) ->
meck:unload([emqx_zone]),
meck:unload([emqx_access_control]),
Config.
%%--------------------------------------------------------------------
@ -855,6 +856,30 @@ t_ws_cookie_init(_) ->
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%--------------------------------------------------------------------
%% Test cases for other mechnisms
%%--------------------------------------------------------------------
t_flapping_detect(_) ->
Parent = self(),
ok = meck:expect(emqx_cm, open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end),
IdleChannel = channel(#{conn_state => idle}),
{shutdown, not_authorized, _ConnAck, _Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
receive
flapping_detect -> ok
after 2000 ->
?assert(false, "Flapping detect should be exected in connecting progress")
end,
meck:unload([emqx_flapping]).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -25,13 +25,24 @@
-define(SUITE, ?MODULE).
-define(wait(For, Timeout),
emqx_ct_helpers:wait_for(
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
-define(ack, shared_sub_ack).
-define(no_ack, no_ack).
-define(WAIT(TIMEOUT, PATTERN, Res),
(fun() ->
receive
PATTERN ->
Res;
Other ->
ct:fail(#{expected => ??PATTERN,
got => Other
})
after
TIMEOUT ->
ct:fail({timeout, ??PATTERN})
end
end)()).
all() -> emqx_ct:all(?SUITE).
init_per_suite(Config) ->
@ -151,40 +162,7 @@ t_no_connection_nack(Config) when is_list(Config) ->
SendF(1),
ct:sleep(200),
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
%% Now kill the connection, expect all following messages to be delivered to the other
%% subscriber.
%emqx_mock_client:stop(ConnPid),
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
% timer:sleep(2),
% _ = emqx_session:info(SPid1),
% _ = emqx_session:info(SPid2),
% %% Now we know what is the other still alive connection
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
% %% Send some more messages
% PacketIdList = lists:seq(2, 10),
% lists:foreach(fun(Id) ->
% SendF(Id),
% ?wait(Received(Id, TheOtherConnPid), 1000)
% end, PacketIdList),
% %% Now close the 2nd (last connection)
% emqx_mock_client:stop(TheOtherConnPid),
% timer:sleep(2),
% %% both sessions should have conn_pid = undefined
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
% %% send more messages, but all should be queued in session state
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
% ?assertEqual(length(PacketIdList), L1 + L2),
% %% clean up
% emqx_mock_client:close_session(PubConnPid),
% emqx_sm:close_session(SPid1),
% emqx_sm:close_session(SPid2),
ok.
t_random(Config) when is_list(Config) ->
@ -443,8 +421,14 @@ t_local_fallback(Config) when is_list(Config) ->
%% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK
t_redispatch(Config) when is_list(Config) ->
ok = ensure_config(sticky, true),
t_redispatch_with_ack(Config) when is_list(Config) ->
test_redispatch(Config, true).
t_redispatch_no_ack(Config) when is_list(Config) ->
test_redispatch(Config, false).
test_redispatch(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
application:set_env(emqx, shared_dispatch_ack_enabled, true),
Group = <<"group1">>,
@ -474,15 +458,59 @@ t_redispatch(Config) when is_list(Config) ->
emqtt:stop(UsedSubPid2),
ok.
t_redispatch_wildcard_with_ack(Config) when is_list(Config)->
redispatch_wildcard(Config, true).
t_redispatch_wildcard_no_ack(Config) when is_list(Config) ->
redispatch_wildcard(Config, false).
%% This one tests that broker tries to redispatch to another member in the group
%% if the first one disconnected before acking (auto_ack set to false)
redispatch_wildcard(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
Group = <<"group1">>,
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
emqx:publish(Message),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
ok = emqtt:stop(UsedSubPid1),
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
{true, UsedSubPid2} = Res,
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full({init, Config}) ->
%% make sure broker does not push more than one inflight
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_dispatch_when_inflights_are_full({'end', _Config}) ->
meck:unload(emqx_zone);
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true),
ok = ensure_config(round_robin, _AckEnabled = true),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
%% Note that max_inflight is 1
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
@ -505,8 +533,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
%% Now kill any client
erlang:exit(ConnPid1, normal),
ct:sleep(100),
ok = kill_process(ConnPid1),
%% And try to send the message
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
@ -521,10 +548,137 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
emqtt:stop(ConnPid2),
ok.
%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (auto_ack=true)
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) ->
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
meck:unload(emqx_zone);
t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
ct:sleep(100),
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kill client 1
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
emqtt:stop(ConnPid2),
ok.
t_dispatch_qos0({init, Config}) when is_list(Config) ->
Config;
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
ok;
t_dispatch_qos0(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
%% subscribe with QoS 0
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
%% subscribe with QoS 0
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
?assertMatch([_], emqx:publish(Message1)),
?assertMatch([_], emqx:publish(Message2)),
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
kill_process(ConnPid1),
%% expect no redispatch
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------
kill_process(Pid) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
receive
{'DOWN', _, process, Pid, _} ->
ok
end.
collect_msgs(Timeout) ->
collect_msgs([], Timeout).
collect_msgs(Acc, Timeout) ->
receive
Msg ->
collect_msgs([Msg | Acc], Timeout)
after
Timeout ->
lists:reverse(Acc)
end.
ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true).