Merge remote-tracking branch 'origin/release-v43' into main-v4.3

This commit is contained in:
Zaiming (Stone) Shi 2022-10-15 14:58:22 +02:00
commit 1ba2c01f25
22 changed files with 292 additions and 75 deletions

View File

@ -24,6 +24,11 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up - name: docker compose up
env: env:
LDAP_TAG: ${{ matrix.ldap_tag }} LDAP_TAG: ${{ matrix.ldap_tag }}
@ -79,6 +84,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \
@ -151,6 +160,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
timeout-minutes: 5 timeout-minutes: 5
run: | run: |
@ -237,6 +250,10 @@ jobs:
- tcp - tcp
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \
@ -318,6 +335,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \

View File

@ -42,6 +42,11 @@ jobs:
use-self-hosted: false use-self-hosted: false
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
# to avoid dirty self-hosted runners
- name: stop containers
run: |
docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true
- name: docker compose up - name: docker compose up
if: endsWith(github.repository, 'emqx') if: endsWith(github.repository, 'emqx')
env: env:

View File

@ -46,24 +46,31 @@ File format:
- Added a test to prevent a last will testament message to be - Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045)
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group - QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094). when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost. to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
- Prior to this fix, some of the time stamps were taken from the `os` module (system call),
while majority of other places are using `erlang` module (from Erlang virtual machine).
This inconsistent behaviour has caused some trouble for the Delayed Publish feature when OS time changes.
Now all time stamps are from `erlang` module. [#8908](https://github.com/emqx/emqx/pull/8908)
### Bug fixes ### Bug fixes
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) - Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071) - Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client: In this change, it also included more changes in redis client:
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19). - Improve redis connection error logging [eredis #19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where). passing around plaintext args which may get dumpped to crash logs (hard to predict where).
This change also added `format_status` callback for `gen_server` states which hold plaintext This change also added `format_status` callback for `gen_server` states which hold plaintext
password so the process termination log and `sys:get_status` will print '******' instead of password so the process termination log and `sys:get_status` will print '******' instead of
the password to console. the password to console.
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22) - Avoid pool name clashing [eredis_cluster #22](https://github.com/emqx/eredis_cluster/pull/22)
Same `format_status` callback is added here too for `gen_server`s which hold password in Same `format_status` callback is added here too for `gen_server`s which hold password in
their state. their state.
@ -72,6 +79,21 @@ File format:
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
but not the subscribing topic), caused messages to be lost when dispatching. but not the subscribing topic), caused messages to be lost when dispatching.
- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used.
Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect)
the message is still dispatched to it.
This issue only occurs when unsubscribe with the session kept.
Fixed in [#9119](https://github.com/emqx/emqx/pull/9119)
- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all.
Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber
is hit (and then start sticking to it).
After this fix, it will start sticking to whichever randomly picked member even when it is a
subscriber from another node in the cluster.
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
- Fix rule engine fallback actions metrics reset. [#9125](https://github.com/emqx/emqx/pull/9125)
## v4.3.20 ## v4.3.20
### Bug fixes ### Bug fixes

View File

@ -1 +1,20 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(APP, emqx_auth_http). -define(APP, emqx_auth_http).
%% equals to the default value of ehttpc
-define(DEFAULT_RETRY_TIMES, 2).

View File

@ -24,7 +24,7 @@
-logger_header("[ACL http]"). -logger_header("[ACL http]").
-import(emqx_auth_http_cli, -import(emqx_auth_http_cli,
[ request/6 [ request/7
, feedvar/2 , feedvar/2
]). ]).
@ -56,13 +56,15 @@ description() -> "ACL with HTTP API".
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
check_acl_request(#{pool_name := PoolName, check_acl_request(ACLParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
access(subscribe) -> 1; access(subscribe) -> 1;
access(publish) -> 2. access(publish) -> 2.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_http, {application, emqx_auth_http,
[{description, "EMQ X Authentication/ACL with HTTP API"}, [{description, "EMQ X Authentication/ACL with HTTP API"},
{vsn, "4.3.7"}, % strict semver, bump manually! {vsn, "4.3.8"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_http_sup]}, {registered, [emqx_auth_http_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -1,17 +1,27 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.6", [{"4.3.7",
[ %% There are only changes to the schema file, so we don't need any [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
%% commands here {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
@ -20,21 +30,29 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.6", [{"4.3.7",
[ %% There are only changes to the schema file, so we don't need any [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
%% commands here {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
@ -43,6 +61,5 @@
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
[{restart_application,emqx_auth_http}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -25,7 +25,7 @@
-logger_header("[Auth http]"). -logger_header("[Auth http]").
-import(emqx_auth_http_cli, -import(emqx_auth_http_cli,
[ request/6 [ request/7
, feedvar/2 , feedvar/2
]). ]).
@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API".
%% Requests %% Requests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
authenticate(#{pool_name := PoolName, authenticate(AuthParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout). Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES),
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()). -spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()).
is_superuser(undefined, _ClientInfo) -> is_superuser(undefined, _ClientInfo) ->
false; false;
is_superuser(#{pool_name := PoolName, is_superuser(SuperParams =
#{pool_name := PoolName,
path := Path, path := Path,
method := Method, method := Method,
headers := Headers, headers := Headers,
params := Params, params := Params,
timeout := Timeout}, ClientInfo) -> timeout := Timeout}, ClientInfo) ->
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
{ok, 200, _Body} -> true; {ok, 200, _Body} -> true;
{ok, _Code, _Body} -> false; {ok, _Code, _Body} -> false;
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]), {error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]),

View File

@ -19,6 +19,7 @@
-include("emqx_auth_http.hrl"). -include("emqx_auth_http.hrl").
-export([ request/6 -export([ request/6
, request/7
, feedvar/2 , feedvar/2
, feedvar/3 , feedvar/3
]). ]).
@ -27,18 +28,21 @@
%% HTTP Request %% HTTP Request
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
request(PoolName, get, Path, Headers, Params, Timeout) -> request(PoolName, Method, Path, Headers, Params, Timeout) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))), request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES).
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout));
request(PoolName, post, Path, Headers, Params, Timeout) -> request(PoolName, get, Path, Headers, Params, Timeout, Retry) ->
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry));
request(PoolName, post, Path, Headers, Params, Timeout, Retry) ->
Body = case proplists:get_value(<<"content-type">>, Headers) of Body = case proplists:get_value(<<"content-type">>, Headers) of
"application/x-www-form-urlencoded" -> "application/x-www-form-urlencoded" ->
cow_qs:qs(bin_kw(Params)); cow_qs:qs(bin_kw(Params));
"application/json" -> "application/json" ->
emqx_json:encode(bin_kw(Params)) emqx_json:encode(bin_kw(Params))
end, end,
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)). reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)).
reply({ok, StatusCode, _Headers}) -> reply({ok, StatusCode, _Headers}) ->
{ok, StatusCode, <<>>}; {ok, StatusCode, <<>>};

View File

@ -7,6 +7,7 @@
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.14", {"4.3.14",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -15,6 +16,7 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.13", {"4.3.13",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -25,6 +27,7 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -35,6 +38,7 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.11", {"4.3.11",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -46,6 +50,7 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.10", {"4.3.10",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -57,6 +62,7 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -69,6 +75,7 @@
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date}, {add_module,emqx_rule_date},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
@ -217,6 +224,7 @@
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.14", {"4.3.14",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -225,6 +233,7 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.13", {"4.3.13",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -235,6 +244,7 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -245,6 +255,7 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.11", {"4.3.11",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -256,6 +267,7 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.10", {"4.3.10",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -267,6 +279,7 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -280,6 +293,7 @@
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]}, {delete_module,emqx_rule_date}]},
{"4.3.8", {"4.3.8",

View File

@ -131,16 +131,15 @@ clear_metrics(Id) ->
-spec(reset_metrics(rule_id()) -> ok). -spec(reset_metrics(rule_id()) -> ok).
reset_metrics(Id) -> reset_metrics(Id) ->
reset_speeds(Id), reset_speeds(Id),
reset_metrics(Id, rule_metrics()), do_reset_metrics(Id, rule_metrics()),
case emqx_rule_registry:get_rule(Id) of case emqx_rule_registry:get_rule(Id) of
not_found -> ok; not_found -> ok;
{ok, #rule{actions = Actions}} -> {ok, #rule{actions = Actions}} ->
[ reset_metrics(ActionId, action_metrics()) reset_action_metrics(Actions),
|| #action_instance{ id = ActionId} <- Actions],
ok ok
end. end.
reset_metrics(Id, Metrics) -> do_reset_metrics(Id, Metrics) ->
case couters_ref(Id) of case couters_ref(Id) of
not_found -> ok; not_found -> ok;
Ref -> [counters:put(Ref, metrics_idx(Idx), 0) Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
@ -148,6 +147,12 @@ reset_metrics(Id, Metrics) ->
ok ok
end. end.
reset_action_metrics(Actions) ->
lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) ->
do_reset_metrics(ActionId, action_metrics()),
reset_action_metrics(FallbackActions)
end, Actions).
reset_speeds(Id) -> reset_speeds(Id) ->
gen_server:call(?MODULE, {reset_speeds, Id}). gen_server:call(?MODULE, {reset_speeds, Id}).

View File

@ -53,6 +53,7 @@ groups() ->
t_unregister_provider, t_unregister_provider,
t_create_rule, t_create_rule,
t_reset_metrics, t_reset_metrics,
t_reset_metrics_fallbacks,
t_create_resource t_create_resource
]}, ]},
{actions, [], {actions, [],
@ -334,18 +335,14 @@ t_inspect_action(_Config) ->
t_reset_metrics(_Config) -> t_reset_metrics(_Config) ->
ok = emqx_rule_engine:load_providers(), ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( {ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} =
#{type => built_in, emqx_rule_engine:create_rule(
config => #{}, #{rawsql => "select clientid as c, username as u "
description => <<"debug resource">>}), "from \"t1\" ",
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule( actions => [#{name => 'inspect', args => #{a=>1, b=>2}}],
#{rawsql => "select clientid as c, username as u " type => built_in,
"from \"t1\" ", description => <<"Inspect rule">>
actions => [#{name => 'inspect', }),
args => #{'$resource' => ResId, a=>1, b=>2}}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
[ begin [ begin
@ -353,16 +350,68 @@ t_reset_metrics(_Config) ->
timer:sleep(100) timer:sleep(100)
end end
|| _ <- lists:seq(1,10)], || _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(ActId0)),
emqx_rule_metrics:reset_metrics(Id), emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0, ?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0, matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0}, speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)), emqx_rule_metrics:get_rule_metrics(Id)),
?assertEqual(#{failed => 0,success => 0,taken => 0}, ?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(ResId)), emqx_rule_metrics:get_action_metrics(ActId0)),
emqtt:stop(Client), emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id), emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId), ok.
t_reset_metrics_fallbacks(_Config) ->
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_registry:add_action(
#action{name = 'crash_action', app = ?APP,
module = ?MODULE, on_create = crash_action,
types=[], params_spec = #{},
title = #{en => <<"Crash Action">>},
description = #{en => <<"This action will always fail!">>}}),
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [
#action_instance{id = ActId1},
#action_instance{id = ActId2}
]}]}} =
emqx_rule_engine:create_rule(
#{rawsql => "select clientid as c, username as u "
"from \"t1\" ",
actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [
#{name => 'inspect', args => #{}, fallbacks => []},
#{name => 'inspect', args => #{}, fallbacks => []}
]}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
[ begin
emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0),
timer:sleep(100)
end
|| _ <- lists:seq(1,10)],
?assertMatch(#{exception := 0, failed := 0,
matched := 10, no_result := 0, passed := 10},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertMatch(#{failed := 10, success := 0, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]],
[?assertMatch(#{failed := 0, success := 10, taken := 10},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]],
emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
emqx_rule_metrics:get_rule_metrics(Id)),
[?assertEqual(#{failed => 0, success => 0, taken => 0},
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]],
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
ok = emqx_rule_registry:remove_action('crash_action'),
ok. ok.
t_republish_action(_Config) -> t_republish_action(_Config) ->

View File

@ -483,7 +483,7 @@ esac
if [ "$IS_BOOT_COMMAND" = 'no' ]; then if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name # for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086 # shellcheck disable=SC2012
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)" LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS_FILE" ]; then if [ -z "$LATEST_VM_ARGS_FILE" ]; then
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'" echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.3.21-alpha.1"}). -define(EMQX_RELEASE, {opensource, "4.3.21"}).
-else. -else.

View File

@ -41,7 +41,7 @@
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {redbug, "2.0.7"} , {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.8"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}

View File

@ -8,8 +8,8 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
case "${PKG_VSN}" in case "${PKG_VSN}" in
4.3*) 4.3*)
EMQX_CE_DASHBOARD_VERSION='v4.3.10' EMQX_CE_DASHBOARD_VERSION='v4.3.11'
EMQX_EE_DASHBOARD_VERSION='v4.3.24' EMQX_EE_DASHBOARD_VERSION='v4.3.26'
;; ;;
*) *)
echo "Unsupported version $PKG_VSN" >&2 echo "Unsupported version $PKG_VSN" >&2

View File

@ -227,6 +227,7 @@ shutdown() ->
shutdown(normal). shutdown(normal).
shutdown(Reason) -> shutdown(Reason) ->
ok = emqx_misc:maybe_mute_rpc_log(),
?LOG(critical, "emqx shutdown for ~s", [Reason]), ?LOG(critical, "emqx shutdown for ~s", [Reason]),
on_shutdown(Reason), on_shutdown(Reason),
_ = emqx_plugins:unload(), _ = emqx_plugins:unload(),

View File

@ -288,6 +288,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
fun enrich_client/2, fun enrich_client/2,
fun set_log_meta/2, fun set_log_meta/2,
fun check_banned/2, fun check_banned/2,
fun count_flapping_event/2,
fun auth_connect/2 fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of ], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
@ -1022,11 +1023,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel); shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel = handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) ->
#channel{conn_state = connected,
clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2}; {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
@ -1335,6 +1332,13 @@ auth_connect(#mqtt_packet_connect{password = Password},
{error, emqx_reason_codes:connack_error(Reason)} {error, emqx_reason_codes:connack_error(Reason)}
end. end.
%%--------------------------------------------------------------------
%% Flapping
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
_ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
{ok, Channel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enhanced Authentication %% Enhanced Authentication

View File

@ -23,6 +23,7 @@
-export([ merge_opts/2 -export([ merge_opts/2
, maybe_apply/2 , maybe_apply/2
, maybe_mute_rpc_log/0
, compose/1 , compose/1
, compose/2 , compose/2
, run_fold/3 , run_fold/3
@ -444,6 +445,27 @@ do_parallel_map(Fun, List) ->
PidList PidList
). ).
%% @doc Call this function to avoid logs printed to RPC caller node.
-spec maybe_mute_rpc_log() -> ok.
maybe_mute_rpc_log() ->
GlNode = node(group_leader()),
maybe_mute_rpc_log(GlNode).
maybe_mute_rpc_log(Node) when Node =:= node() ->
%% do nothing, this is a local call
ok;
maybe_mute_rpc_log(Node) ->
case atom_to_list(Node) of
"remsh" ++ _ ->
%% this is either an upgrade script or nodetool
%% do nothing, the log may go to the 'emqx' command line console
ok;
_ ->
%% otherwise set group leader to local node
_ = group_leader(whereis(init), self()),
ok
end.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -337,7 +337,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of All = subscribers(Group, Topic),
case is_active_sub(Sub0, FailedSubs, All) of
true -> true ->
%% the old subscriber is still alive %% the old subscriber is still alive
%% keep using it for sticky strategy %% keep using it for sticky strategy
@ -455,6 +456,7 @@ handle_cast(Msg, State) ->
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
#emqx_shared_subscription{subpid = SubPid} = NewRecord, #emqx_shared_subscription{subpid = SubPid} = NewRecord,
ok = maybe_insert_alive_tab(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
@ -507,8 +509,10 @@ update_stats(State) ->
State. State.
%% Return 'true' if the subscriber process is alive AND not in the failed list %% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) -> is_active_sub(Pid, FailedSubs, All) ->
not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). lists:member(Pid, All) andalso
(not maps:is_key(Pid, FailedSubs)) andalso
is_alive_sub(Pid).
%% erlang:is_process_alive/1 does not work with remote pid. %% erlang:is_process_alive/1 does not work with remote pid.
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->

View File

@ -33,11 +33,6 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% CM Meck %% CM Meck
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
%% Broker Meck %% Broker Meck
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]), ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
%% Hooks Meck %% Hooks Meck
@ -53,8 +48,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
meck:unload([emqx_access_control, meck:unload([emqx_metrics,
emqx_metrics,
emqx_session, emqx_session,
emqx_broker, emqx_broker,
emqx_hooks, emqx_hooks,
@ -63,10 +57,16 @@ end_per_suite(_Config) ->
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
meck:new(emqx_zone, [passthrough, no_history, no_link]), meck:new(emqx_zone, [passthrough, no_history, no_link]),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(_) -> {ok, #{auth_result => success}} end),
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
Config. Config.
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
meck:unload([emqx_zone]), meck:unload([emqx_zone]),
meck:unload([emqx_access_control]),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -853,6 +853,30 @@ t_ws_cookie_init(_) ->
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%--------------------------------------------------------------------
%% Test cases for other mechnisms
%%--------------------------------------------------------------------
t_flapping_detect(_) ->
Parent = self(),
ok = meck:expect(emqx_cm, open_session,
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end),
IdleChannel = channel(#{conn_state => idle}),
{shutdown, not_authorized, _ConnAck, _Channel} =
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
receive
flapping_detect -> ok
after 2000 ->
?assert(false, "Flapping detect should be exected in connecting progress")
end,
meck:unload([emqx_flapping]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -72,4 +72,4 @@ t_expired_detecting(_) ->
(_) -> false end, ets:tab2list(emqx_flapping))), (_) -> false end, ets:tab2list(emqx_flapping))),
timer:sleep(200), timer:sleep(200),
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
(_) -> true end, ets:tab2list(emqx_flapping))). (_) -> true end, ets:tab2list(emqx_flapping))).