Merge branch 'release-v44' into 1018-sync-release-v44-back-to-main

This commit is contained in:
JimMoen 2022-10-18 10:44:27 +08:00
commit bac7b61cfd
28 changed files with 629 additions and 163 deletions

View File

@ -24,6 +24,11 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up - name: docker compose up
env: env:
LDAP_TAG: ${{ matrix.ldap_tag }} LDAP_TAG: ${{ matrix.ldap_tag }}
@ -79,6 +84,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \
@ -151,6 +160,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
timeout-minutes: 5 timeout-minutes: 5
run: | run: |
@ -237,6 +250,10 @@ jobs:
- tcp - tcp
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \
@ -318,6 +335,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \

View File

@ -42,6 +42,11 @@ jobs:
use-self-hosted: false use-self-hosted: false
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up - name: docker compose up
if: endsWith(github.repository, 'emqx') if: endsWith(github.repository, 'emqx')
env: env:

View File

@ -32,8 +32,18 @@ File format:
- Added a test to prevent a last will testament message to be - Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045)
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
### Bug fixes ### Bug fixes
- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
@ -48,6 +58,26 @@ File format:
Same `format_status` callback is added here too for `gen_server`s which hold password in Same `format_status` callback is added here too for `gen_server`s which hold password in
their state. their state.
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
- When discarding QoS 2 inflight messages, there were excessive logs
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
but not the subscribing topic), caused messages to be lost when dispatching.
- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used.
Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect)
the message is still dispatched to it.
This issue only occurs when unsubscribe with the session kept.
Fixed in [#9119](https://github.com/emqx/emqx/pull/9119)
- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all.
Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber
is hit (and then start sticking to it).
After this fix, it will start sticking to whichever randomly picked member even when it is a
subscriber from another node in the cluster.
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125)
## v4.3.20 ## v4.3.20
### Bug fixes ### Bug fixes

View File

@ -1 +1,20 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(APP, emqx_auth_http). -define(APP, emqx_auth_http).
%% equals to the default value of ehttpc
-define(DEFAULT_RETRY_TIMES, 2).

View File

@ -24,7 +24,7 @@
-logger_header("[ACL http]"). -logger_header("[ACL http]").
-import(emqx_auth_http_cli, -import(emqx_auth_http_cli,
[ request/6 [ request/7
, feedvar/2 , feedvar/2
]). ]).
@ -56,13 +56,15 @@ description() -> "ACL with HTTP API".
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_acl_request(#{pool_name := PoolName, check_acl_request(ACLParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
access(subscribe) -> 1; access(subscribe) -> 1;
access(publish) -> 2. access(publish) -> 2.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_http, {application, emqx_auth_http,
[{description, "EMQ X Authentication/ACL with HTTP API"}, [{description, "EMQ X Authentication/ACL with HTTP API"},
{vsn, "4.3.7"}, % strict semver, bump manually! {vsn, "4.3.8"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_http_sup]}, {registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -1,17 +1,27 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.6", [{"4.3.7",
[ %% There are only changes to the schema file, so we don't need any [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
%% commands here {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
@ -20,21 +30,29 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.6", [{"4.3.7",
[ %% There are only changes to the schema file, so we don't need any [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
%% commands here {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
@ -43,6 +61,5 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -25,7 +25,7 @@
-logger_header("[Auth http]"). -logger_header("[Auth http]").
-import(emqx_auth_http_cli, -import(emqx_auth_http_cli,
[ request/6 [ request/7
, feedvar/2 , feedvar/2
]). ]).
@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API".
%% Requests %% Requests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
authenticate(#{pool_name := PoolName, authenticate(AuthParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()).
is_superuser(undefined, _ClientInfo) -> is_superuser(undefined, _ClientInfo) ->
false; false;
is_superuser(#{pool_name := PoolName, is_superuser(SuperParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
{ok, 200, _Body} -> true; {ok, 200, _Body} -> true;
{ok, _Code, _Body} -> false; {ok, _Code, _Body} -> false;
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]),

View File

@ -19,6 +19,7 @@
-include("emqx_auth_http.hrl"). -include("emqx_auth_http.hrl").
-export([ request/6 -export([ request/6
, request/7
, feedvar/2 , feedvar/2
, feedvar/3 , feedvar/3
]). ]).
@ -27,18 +28,21 @@
%% HTTP Request %% HTTP Request
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
request(PoolName, get, Path, Headers, Params, Timeout) -> request(PoolName, Method, Path, Headers, Params, Timeout) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES).
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout));
request(PoolName, post, Path, Headers, Params, Timeout) -> request(PoolName, get, Path, Headers, Params, Timeout, Retry) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry));
request(PoolName, post, Path, Headers, Params, Timeout, Retry) ->
Body = case proplists:get_value(<<"content-type">>, Headers) of Body = case proplists:get_value(<<"content-type">>, Headers) of
"application/x-www-form-urlencoded" -> "application/x-www-form-urlencoded" ->
cow_qs:qs(bin_kw(Params)); cow_qs:qs(bin_kw(Params));
"application/json" -> "application/json" ->
emqx_json:encode(bin_kw(Params)) emqx_json:encode(bin_kw(Params))
end, end,
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)). reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)).
reply({ok, StatusCode, _Headers}) -> reply({ok, StatusCode, _Headers}) ->
{ok, StatusCode, <<>>}; {ok, StatusCode, <<>>};

View File

@ -5,15 +5,18 @@
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>, {<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -21,7 +24,8 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5", {"4.4.5",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -30,7 +34,8 @@
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4", {"4.4.4",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -39,7 +44,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -101,15 +107,18 @@
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>, {<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -117,7 +126,8 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5", {"4.4.5",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -126,7 +136,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4", {"4.4.4",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -135,7 +146,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},

View File

@ -133,16 +133,15 @@ clear_metrics(Id) ->
-spec(reset_metrics(rule_id()) -> ok). -spec(reset_metrics(rule_id()) -> ok).
reset_metrics(Id) -> reset_metrics(Id) ->
reset_speeds(Id), reset_speeds(Id),
reset_metrics(Id, rule_metrics()), do_reset_metrics(Id, rule_metrics()),
case emqx_rule_registry:get_rule(Id) of case emqx_rule_registry:get_rule(Id) of
not_found -> ok; not_found -> ok;
{ok, #rule{actions = Actions}} -> {ok, #rule{actions = Actions}} ->
[ reset_metrics(ActionId, action_metrics()) reset_action_metrics(Actions),
|| #action_instance{ id = ActionId} <- Actions],
ok ok
end. end.
reset_metrics(Id, Metrics) -> do_reset_metrics(Id, Metrics) ->
case couters_ref(Id) of case couters_ref(Id) of
not_found -> ok; not_found -> ok;
Ref -> [counters:put(Ref, metrics_idx(Idx), 0) Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
@ -150,6 +149,12 @@ reset_metrics(Id, Metrics) ->
ok ok
end. end.
reset_action_metrics(Actions) ->
lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) ->
do_reset_metrics(ActionId, action_metrics()),
reset_action_metrics(FallbackActions)
end, Actions).
reset_speeds(Id) -> reset_speeds(Id) ->
gen_server:call(?MODULE, {reset_speeds, Id}). gen_server:call(?MODULE, {reset_speeds, Id}).
@ -330,6 +335,8 @@ handle_call({create_rule_metrics, Id}, _From,
_ -> RuleSpeeds#{Id => #rule_speed{}} _ -> RuleSpeeds#{Id => #rule_speed{}}
end}}; end}};
handle_call({reset_speeds, _Id}, _From, State = #state{rule_speeds = undefined}) ->
{reply, ok, State};
handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) -> handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) ->
{reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}}; {reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}};

View File

@ -50,6 +50,7 @@ groups() ->
t_unregister_provider, t_unregister_provider,
t_create_rule, t_create_rule,
t_reset_metrics, t_reset_metrics,
t_reset_metrics_fallbacks,
t_create_resource t_create_resource
]}, ]},
{actions, [], {actions, [],
@ -381,18 +382,14 @@ t_inspect_action(_Config) ->
t_reset_metrics(_Config) -> t_reset_metrics(_Config) ->
ok = emqx_rule_engine:load_providers(), ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( {ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} =
#{type => built_in, emqx_rule_engine:create_rule(
config => #{}, #{rawsql => "select clientid as c, username as u "
description => <<"debug resource">>}), "from \"t1\" ",
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule( actions => [#{name => 'inspect', args => #{a=>1, b=>2}}],
#{rawsql => "select clientid as c, username as u " type => built_in,
"from \"t1\" ", description => <<"Inspect rule">>
actions => [#{name => 'inspect', }),
args => #{'$resource' => ResId, a=>1, b=>2}}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
[ begin [ begin
@ -400,16 +397,68 @@ t_reset_metrics(_Config) ->
timer:sleep(100) timer:sleep(100)
end end
|| _ <- lists:seq(1,10)], || _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(ActId0)),
emqx_rule_metrics:reset_metrics(Id), emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0, ?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0, matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)), emqx_rule_metrics:get_rule_metrics(Id)),
?assertEqual(#{failed => 0,success => 0,taken => 0}, ?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(ResId)), emqx_rule_metrics:get_action_metrics(ActId0)),
emqtt:stop(Client), emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id), emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId), ok.
t_reset_metrics_fallbacks(_Config) ->
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_registry:add_action(
#action{name = 'crash_action', app = ?APP,
module = ?MODULE, on_create = crash_action,
types=[], params_spec = #{},
title = #{en => <<"Crash Action">>},
description = #{en => <<"This action will always fail!">>}}),
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [
#action_instance{id = ActId1},
#action_instance{id = ActId2}
]}]}} =
emqx_rule_engine:create_rule(
#{rawsql => "select clientid as c, username as u "
"from \"t1\" ",
actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [
#{name => 'inspect', args => #{}, fallbacks => []},
#{name => 'inspect', args => #{}, fallbacks => []}
]}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
[ begin
emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0),
timer:sleep(100)
end
|| _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertMatch(#{failed := 10, success := 0, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]],
[?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]],
emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]],
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
ok = emqx_rule_registry:remove_action('crash_action'),
ok. ok.
t_republish_action(_Config) -> t_republish_action(_Config) ->

View File

@ -525,7 +525,7 @@ esac
if [ "$IS_BOOT_COMMAND" = 'no' ]; then if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name # for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086 # shellcheck disable=SC2012
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)" LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS_FILE" ]; then if [ -z "$LATEST_VM_ARGS_FILE" ]; then
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'" echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.10-alpha.1"}). -define(EMQX_RELEASE, {opensource, "4.4.10"}).
-else. -else.

View File

@ -41,7 +41,7 @@
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {redbug, "2.0.7"} , {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.8"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}

View File

@ -21,6 +21,24 @@ parse_semver() {
echo "$1" | tr '.|-' ' ' echo "$1" | tr '.|-' ' '
} }
exempt_bump() {
local app="$1"
local from="$2"
local to="$3"
case "$app,$from,$to" in
"lib-ee/emqx_conf,4.3.9,4.4.0")
true
;;
"lib-ee/emqx_license,4.3.7,4.4.0")
true
;;
*)
false
;;
esac
}
check_apps() { check_apps() {
while read -r app; do while read -r app; do
if [ "$app" != "emqx" ]; then if [ "$app" != "emqx" ]; then
@ -42,6 +60,7 @@ check_apps() {
if [ "$old_app_version" = "$now_app_version" ]; then if [ "$old_app_version" = "$now_app_version" ]; then
changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \ changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \
-- "$app_path/src" \ -- "$app_path/src" \
-- "$app_path/include" \
-- ":(exclude)$app_path/src/*.appup.src" \ -- ":(exclude)$app_path/src/*.appup.src" \
-- "$app_path/priv" \ -- "$app_path/priv" \
-- "$app_path/c_src" | wc -l ) " -- "$app_path/c_src" | wc -l ) "
@ -49,12 +68,12 @@ check_apps() {
echo "$src_file needs a vsn bump (old=$old_app_version)" echo "$src_file needs a vsn bump (old=$old_app_version)"
echo "changed: $changed_lines" echo "changed: $changed_lines"
bad_app_count=$(( bad_app_count + 1)) bad_app_count=$(( bad_app_count + 1))
elif [ "$app" = 'emqx_dashboard' ]; then elif [[ ${app_path} = *emqx_dashboard* ]]; then
## emqx_dashboard is ensured to be upgraded after all other plugins ## emqx_dashboard is ensured to be upgraded after all other plugins
## at the end of its appup instructions, there is the final instruction ## at the end of its appup instructions, there is the final instruction
## {apply, {emqx_plugins, load, []} ## {apply, {emqx_plugins, load, []}
## since we don't know which plugins are stopped during the upgrade ## since we don't know which plugins are stopped during the upgrade
## for safety, we just force a dashboard version bump for each and every release ## for safty, we just force a dashboard version bump for each and every release
## even if there is nothing changed in the app ## even if there is nothing changed in the app
echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade" echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade"
bad_app_count=$(( bad_app_count + 1)) bad_app_count=$(( bad_app_count + 1))
@ -69,8 +88,12 @@ check_apps() {
[ "$(( old_app_version_semver[2] + 1 ))" = "${now_app_version_semver[2]}" ]; then [ "$(( old_app_version_semver[2] + 1 ))" = "${now_app_version_semver[2]}" ]; then
true true
else else
if exempt_bump "$app" "$old_app_version" "$now_app_version"; then
true
else
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version" echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"
bad_app_count=$(( bad_app_count + 1)) bad_app_count=$(( bad_app_count + 1))
fi
fi fi
fi fi
done < <(./scripts/find-apps.sh) done < <(./scripts/find-apps.sh)

View File

@ -8,13 +8,13 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
case "${PKG_VSN}" in case "${PKG_VSN}" in
4.3*) 4.3*)
EMQX_CE_DASHBOARD_VERSION='v4.3.10' EMQX_CE_DASHBOARD_VERSION='v4.3.11'
EMQX_EE_DASHBOARD_VERSION='v4.3.24' EMQX_EE_DASHBOARD_VERSION='v4.3.26'
;; ;;
4.4*) 4.4*)
# keep the above 4.3 untouched, otherwise conflicts! # keep the above 4.3 untouched, otherwise conflicts!
EMQX_CE_DASHBOARD_VERSION='v4.4.5' EMQX_CE_DASHBOARD_VERSION='v4.4.6'
EMQX_EE_DASHBOARD_VERSION='v4.4.15' EMQX_EE_DASHBOARD_VERSION='v4.4.17'
;; ;;
*) *)
echo "Unsupported version $PKG_VSN" >&2 echo "Unsupported version $PKG_VSN" >&2

View File

@ -230,6 +230,16 @@ if [ "$HAS_RELUP_DB" = 'yes' ]; then
./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS" ./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS"
fi fi
## Run some additional checks (e.g. some for enterprise edition only)
CHECKS_DIR="./scripts/rel/checks"
if [ -d "${CHECKS_DIR}" ]; then
CHECKS="$(find "${CHECKS_DIR}" -name "*.sh" -print0 2>/dev/null | xargs -0)"
for c in $CHECKS; do
logmsg "Executing $c"
$c
done
fi
if [ "$DRYRUN" = 'yes' ]; then if [ "$DRYRUN" = 'yes' ]; then
logmsg "Release tag is ready to be created with command: git tag $TAG" logmsg "Release tag is ready to be created with command: git tag $TAG"
else else

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.9", [{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret}, {add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -271,7 +272,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.9", [{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},

View File

@ -227,6 +227,7 @@ shutdown() ->
shutdown(normal). shutdown(normal).
shutdown(Reason) -> shutdown(Reason) ->
ok = emqx_misc:maybe_mute_rpc_log(),
?LOG(critical, "emqx shutdown for ~s", [Reason]), ?LOG(critical, "emqx shutdown for ~s", [Reason]),
on_shutdown(Reason), on_shutdown(Reason),
_ = emqx_plugins:unload(), _ = emqx_plugins:unload(),

View File

@ -312,6 +312,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
fun enrich_client/2, fun enrich_client/2,
fun set_log_meta/2, fun set_log_meta/2,
fun check_banned/2, fun check_banned/2,
fun count_flapping_event/2,
fun auth_connect/2 fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of ], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
@ -1060,11 +1061,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel); shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel = handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) ->
#channel{conn_state = connected,
clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2}; {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
@ -1373,6 +1370,13 @@ auth_connect(#mqtt_packet_connect{password = Password},
{error, emqx_reason_codes:connack_error(Reason)} {error, emqx_reason_codes:connack_error(Reason)}
end. end.
%%--------------------------------------------------------------------
%% Flapping
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
_ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
{ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enhanced Authentication %% Enhanced Authentication

View File

@ -25,6 +25,7 @@
-export([ merge_opts/2 -export([ merge_opts/2
, maybe_apply/2 , maybe_apply/2
, maybe_mute_rpc_log/0
, compose/1 , compose/1
, compose/2 , compose/2
, run_fold/3 , run_fold/3
@ -446,6 +447,27 @@ do_parallel_map(Fun, List) ->
PidList PidList
). ).
%% @doc Call this function to avoid logs printed to RPC caller node.
-spec maybe_mute_rpc_log() -> ok.
maybe_mute_rpc_log() ->
GlNode = node(group_leader()),
maybe_mute_rpc_log(GlNode).
maybe_mute_rpc_log(Node) when Node =:= node() ->
%% do nothing, this is a local call
ok;
maybe_mute_rpc_log(Node) ->
case atom_to_list(Node) of
"remsh" ++ _ ->
%% this is either an upgrade script or nodetool
%% do nothing, the log may go to the 'emqx' command line console
ok;
_ ->
%% otherwise set group leader to local node
_ = group_leader(whereis(init), self()),
ok
end.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -683,26 +683,35 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight}) -> redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
InflightList = emqx_inflight:to_list(Inflight), AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
lists:foreach(fun F = fun({_, {Msg, _Ts}}) ->
%% Only QoS1 messages get redispatched, because QoS2 messages case Msg of
%% must be sent to the same client, once they're in flight #message{qos = ?QOS_1} ->
({_, {#message{qos = ?QOS_2} = Msg, _}}) -> %% For QoS 2, here is what the spec says:
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); %% If the Client's Session terminates before the Client reconnects,
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> %% the Server MUST NOT send the Application Message to any other
case emqx_shared_sub:get_group(Msg) of %% subscribed Client [MQTT-4.8.2-5].
{ok, Group} -> {true, Msg};
%% Note that dispatch is called with self() in failed subs _ ->
%% This is done to avoid dispatching back to caller %% QoS 2, after pubrec is received
Delivery = #delivery{sender = self(), message = Msg}, %% the inflight record is updated to an atom
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); false
_ -> end
false end,
end; InflightList = lists:filtermap(F, AllInflights),
(_) -> MqList = mqueue_to_list(Q, []),
ok emqx_shared_sub:redispatch(InflightList ++ MqList).
end, InflightList).
%% convert mqueue to a list
%% the messages at the head of the list is to be dispatched before the tail
mqueue_to_list(Q, Acc) ->
case emqx_mqueue:out(Q) of
{empty, _Q} ->
lists:reverse(Acc);
{{value, Msg}, Q1} ->
mqueue_to_list(Q1, [Msg | Acc])
end.
-compile({inline, [run_hook/2]}). -compile({inline, [run_hook/2]}).
run_hook(Name, Args) -> run_hook(Name, Args) ->

View File

@ -39,7 +39,7 @@
]). ]).
-export([ dispatch/3 -export([ dispatch/3
, dispatch_to_non_self/3 , redispatch/1
]). ]).
-export([ maybe_ack/1 -export([ maybe_ack/1
@ -47,7 +47,6 @@
, nack_no_connection/1 , nack_no_connection/1
, is_ack_required/1 , is_ack_required/1
, is_retry_dispatch/1 , is_retry_dispatch/1
, get_group/1
]). ]).
%% for testing %% for testing
@ -84,6 +83,9 @@
-define(ACK, shared_sub_ack). -define(ACK, shared_sub_ack).
-define(NACK(Reason), {shared_sub_nack, Reason}). -define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack). -define(NO_ACK, no_ack).
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
-record(state, {pmon}). -record(state, {pmon}).
@ -134,11 +136,12 @@ dispatch(Group, Topic, Delivery) ->
Strategy = strategy(Group), Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg, #message{from = ClientId, topic = SourceTopic} = Msg0,
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
false -> {error, no_subscribers}; false -> {error, no_subscribers};
{Type, SubPid} -> {Type, SubPid} ->
Msg = with_redispatch_to(Msg0, Group, Topic),
case do_dispatch(SubPid, Group, Topic, Msg, Type) of case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> {ok, 1}; ok -> {ok, 1};
{error, Reason} -> {error, Reason} ->
@ -162,7 +165,7 @@ ack_enabled() ->
emqx:get_env(shared_dispatch_ack_enabled, false). emqx:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise %% dispatch without ack, deadlock otherwise
send(SubPid, Topic, {deliver, Topic, Msg}); send(SubPid, Topic, {deliver, Topic, Msg});
%% return either 'ok' (when everything is fine) or 'error' %% return either 'ok' (when everything is fine) or 'error'
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
@ -176,6 +179,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) ->
send(SubPid, Topic, {deliver, Topic, Msg}) send(SubPid, Topic, {deliver, Topic, Msg})
end. end.
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg;
with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack %% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid), Ref = erlang:monitor(process, SubPid),
@ -228,13 +235,22 @@ without_group_ack(Msg) ->
get_group_ack(Msg) -> get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
-spec(get_group(emqx_types:message()) -> {ok, any()} | error). %% @hidden Redispatch is neede only for the messages with redispatch_to header added.
get_group(Msg) -> is_redispatch_needed(#message{} = Msg) ->
case get_group_ack(Msg) of case get_redispatch_to(Msg) of
{_Sender, {_Type, Group, _Ref}} -> {ok, Group}; ?REDISPATCH_TO(_, _) ->
_ -> error true;
_ ->
false
end. end.
%% @hidden Return the `redispatch_to` group-topic in the message header.
%% `false` is returned if the message is not a shared dispatch.
%% or when it's a QoS 0 message.
-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false).
get_redispatch_to(Msg) ->
emqx_message:get_header(redispatch_to, Msg, false).
-spec(is_ack_required(emqx_types:message()) -> boolean()). -spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
@ -245,6 +261,26 @@ is_retry_dispatch(Msg) ->
_ -> false _ -> false
end. end.
%% @doc Redispatch shared deliveries to other members in the group.
redispatch(Messages0) ->
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
case length(Messages) of
L when L > 0 ->
?LOG(info, "Redispatching ~p shared subscription message(s)", [L]),
lists:foreach(fun redispatch_shared_message/1, Messages);
_ ->
ok
end.
redispatch_shared_message(#message{} = Msg) ->
%% As long as it's still a #message{} record in inflight,
%% we should try to re-dispatch
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
dispatch_to_non_self(Group, Topic, Delivery).
%% @doc Negative ack dropped message due to inflight window or message queue being full. %% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) -> maybe_nack_dropped(Msg) ->
@ -301,7 +337,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of All = subscribers(Group, Topic),
case is_active_sub(Sub0, FailedSubs, All) of
true -> true ->
%% the old subscriber is still alive %% the old subscriber is still alive
%% keep using it for sticky strategy %% keep using it for sticky strategy
@ -419,6 +456,7 @@ handle_cast(Msg, State) ->
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
#emqx_shared_subscription{subpid = SubPid} = NewRecord, #emqx_shared_subscription{subpid = SubPid} = NewRecord,
ok = maybe_insert_alive_tab(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
@ -471,8 +509,10 @@ update_stats(State) ->
State. State.
%% Return 'true' if the subscriber process is alive AND not in the failed list %% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) -> is_active_sub(Pid, FailedSubs, All) ->
not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). lists:member(Pid, All) andalso
(not maps:is_key(Pid, FailedSubs)) andalso
is_alive_sub(Pid).
%% erlang:is_process_alive/1 does not work with remote pid. %% erlang:is_process_alive/1 does not work with remote pid.
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->

View File

@ -35,11 +35,7 @@ init_per_suite(Config) ->
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end), ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
%% Broker Meck %% Broker Meck
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
%% Hooks Meck %% Hooks Meck
@ -55,8 +51,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
meck:unload([emqx_access_control, meck:unload([emqx_metrics,
emqx_metrics,
emqx_session, emqx_session,
emqx_broker, emqx_broker,
emqx_hooks, emqx_hooks,
@ -65,10 +60,16 @@ end_per_suite(_Config) ->
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
meck:new(emqx_zone, [passthrough, no_history, no_link]), meck:new(emqx_zone, [passthrough, no_history, no_link]),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
Config. Config.
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
meck:unload([emqx_zone]), meck:unload([emqx_zone]),
meck:unload([emqx_access_control]),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -855,6 +856,30 @@ t_ws_cookie_init(_) ->
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%--------------------------------------------------------------------
%% Test cases for other mechnisms
%%--------------------------------------------------------------------
t_flapping_detect(_) ->
Parent = self(),
ok = meck:expect(emqx_cm, open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end),
IdleChannel = channel(#{conn_state => idle}),
{shutdown, not_authorized, _ConnAck, _Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
receive
flapping_detect -> ok
after 2000 ->
?assert(false, "Flapping detect should be exected in connecting progress")
end,
meck:unload([emqx_flapping]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -72,4 +72,4 @@ t_expired_detecting(_) ->
(_) -> false end, ets:tab2list(emqx_flapping))), (_) -> false end, ets:tab2list(emqx_flapping))),
timer:sleep(200), timer:sleep(200),
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
(_) -> true end, ets:tab2list(emqx_flapping))). (_) -> true end, ets:tab2list(emqx_flapping))).

View File

@ -40,7 +40,9 @@ start_slave(Name, Opts) ->
{ok, _} -> {ok, _} ->
ok; ok;
{error, started_not_connected, _} -> {error, started_not_connected, _} ->
ok ok;
Other ->
throw(Other)
end, end,
pong = net_adm:ping(Node), pong = net_adm:ping(Node),
setup_node(Node, Opts), setup_node(Node, Opts),
@ -92,7 +94,11 @@ setup_node(Node, #{} = Opts) ->
end, end,
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler), EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], %% apps need to be loaded before starting for ekka to find and create mnesia tables
LoadApps = lists:usort([gen_rcp, emqx] ++ ?SLAVE_START_APPS),
lists:foreach(fun(App) ->
rpc:call(Node, application, load, [App])
end, LoadApps),
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
case maps:get(no_join, Opts, false) of case maps:get(no_join, Opts, false) of

View File

@ -25,13 +25,24 @@
-define(SUITE, ?MODULE). -define(SUITE, ?MODULE).
-define(wait(For, Timeout),
emqx_ct_helpers:wait_for(
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
-define(ack, shared_sub_ack). -define(ack, shared_sub_ack).
-define(no_ack, no_ack). -define(no_ack, no_ack).
-define(WAIT(TIMEOUT, PATTERN, Res),
(fun() ->
receive
PATTERN ->
Res;
Other ->
ct:fail(#{expected => ??PATTERN,
got => Other
})
after
TIMEOUT ->
ct:fail({timeout, ??PATTERN})
end
end)()).
all() -> emqx_ct:all(?SUITE). all() -> emqx_ct:all(?SUITE).
init_per_suite(Config) -> init_per_suite(Config) ->
@ -151,40 +162,7 @@ t_no_connection_nack(Config) when is_list(Config) ->
SendF(1), SendF(1),
ct:sleep(200), ct:sleep(200),
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message %% This is the connection which was picked by broker to dispatch (sticky) for 1st message
?assertMatch([#{packet_id := 1}], recv_msgs(1)), ?assertMatch([#{packet_id := 1}], recv_msgs(1)),
%% Now kill the connection, expect all following messages to be delivered to the other
%% subscriber.
%emqx_mock_client:stop(ConnPid),
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
% timer:sleep(2),
% _ = emqx_session:info(SPid1),
% _ = emqx_session:info(SPid2),
% %% Now we know what is the other still alive connection
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
% %% Send some more messages
% PacketIdList = lists:seq(2, 10),
% lists:foreach(fun(Id) ->
% SendF(Id),
% ?wait(Received(Id, TheOtherConnPid), 1000)
% end, PacketIdList),
% %% Now close the 2nd (last connection)
% emqx_mock_client:stop(TheOtherConnPid),
% timer:sleep(2),
% %% both sessions should have conn_pid = undefined
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
% %% send more messages, but all should be queued in session state
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
% ?assertEqual(length(PacketIdList), L1 + L2),
% %% clean up
% emqx_mock_client:close_session(PubConnPid),
% emqx_sm:close_session(SPid1),
% emqx_sm:close_session(SPid2),
ok. ok.
t_random(Config) when is_list(Config) -> t_random(Config) when is_list(Config) ->
@ -443,8 +421,14 @@ t_local_fallback(Config) when is_list(Config) ->
%% This one tests that broker tries to select another shared subscriber %% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK %% If the first one doesn't return an ACK
t_redispatch(Config) when is_list(Config) -> t_redispatch_with_ack(Config) when is_list(Config) ->
ok = ensure_config(sticky, true), test_redispatch(Config, true).
t_redispatch_no_ack(Config) when is_list(Config) ->
test_redispatch(Config, false).
test_redispatch(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
application:set_env(emqx, shared_dispatch_ack_enabled, true), application:set_env(emqx, shared_dispatch_ack_enabled, true),
Group = <<"group1">>, Group = <<"group1">>,
@ -474,15 +458,59 @@ t_redispatch(Config) when is_list(Config) ->
emqtt:stop(UsedSubPid2), emqtt:stop(UsedSubPid2),
ok. ok.
t_redispatch_wildcard_with_ack(Config) when is_list(Config)->
redispatch_wildcard(Config, true).
t_redispatch_wildcard_no_ack(Config) when is_list(Config) ->
redispatch_wildcard(Config, false).
%% This one tests that broker tries to redispatch to another member in the group
%% if the first one disconnected before acking (auto_ack set to false)
redispatch_wildcard(_Config, AckEnabled) ->
ok = ensure_config(sticky, AckEnabled),
Group = <<"group1">>,
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
emqx:publish(Message),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
ok = emqtt:stop(UsedSubPid1),
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
{true, UsedSubPid2} = Res,
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full({init, Config}) ->
%% make sure broker does not push more than one inflight
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_dispatch_when_inflights_are_full({'end', _Config}) ->
meck:unload(emqx_zone);
t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true), ok = ensure_config(round_robin, _AckEnabled = true),
Topic = <<"foo/bar">>, Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
%% Note that max_inflight is 1 {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
{ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2), {ok, _} = emqtt:connect(ConnPid2),
@ -505,8 +533,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
%% Now kill any client %% Now kill any client
erlang:exit(ConnPid1, normal), ok = kill_process(ConnPid1),
ct:sleep(100),
%% And try to send the message %% And try to send the message
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
@ -521,10 +548,137 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
emqtt:stop(ConnPid2), emqtt:stop(ConnPid2),
ok. ok.
%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (auto_ack=true)
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) ->
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
meck:unload(emqx_zone);
t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
ct:sleep(100),
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kill client 1
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
emqtt:stop(ConnPid2),
ok.
t_dispatch_qos0({init, Config}) when is_list(Config) ->
Config;
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
ok;
t_dispatch_qos0(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
%% subscribe with QoS 0
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
%% subscribe with QoS 0
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
?assertMatch([_], emqx:publish(Message1)),
?assertMatch([_], emqx:publish(Message2)),
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
kill_process(ConnPid1),
%% expect no redispatch
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
kill_process(Pid) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
receive
{'DOWN', _, process, Pid, _} ->
ok
end.
collect_msgs(Timeout) ->
collect_msgs([], Timeout).
collect_msgs(Acc, Timeout) ->
receive
Msg ->
collect_msgs([Msg | Acc], Timeout)
after
Timeout ->
lists:reverse(Acc)
end.
ensure_config(Strategy) -> ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true). ensure_config(Strategy, _AckEnabled = true).