Merge pull request #9167 from JimMoen/1018-sync-release-v44-back-to-main
1018 sync release v44 back to main
This commit is contained in:
commit
37663e80c0
|
@ -24,6 +24,11 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
# to avoid dirty self-hosted runners
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker compose up
|
- name: docker compose up
|
||||||
env:
|
env:
|
||||||
LDAP_TAG: ${{ matrix.ldap_tag }}
|
LDAP_TAG: ${{ matrix.ldap_tag }}
|
||||||
|
@ -79,6 +84,10 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker-compose up
|
- name: docker-compose up
|
||||||
run: |
|
run: |
|
||||||
docker-compose \
|
docker-compose \
|
||||||
|
@ -151,6 +160,10 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker-compose up
|
- name: docker-compose up
|
||||||
timeout-minutes: 5
|
timeout-minutes: 5
|
||||||
run: |
|
run: |
|
||||||
|
@ -237,6 +250,10 @@ jobs:
|
||||||
- tcp
|
- tcp
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker-compose up
|
- name: docker-compose up
|
||||||
run: |
|
run: |
|
||||||
docker-compose \
|
docker-compose \
|
||||||
|
@ -318,6 +335,10 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker-compose up
|
- name: docker-compose up
|
||||||
run: |
|
run: |
|
||||||
docker-compose \
|
docker-compose \
|
||||||
|
|
|
@ -42,6 +42,11 @@ jobs:
|
||||||
use-self-hosted: false
|
use-self-hosted: false
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
# to avoid dirty self-hosted runners
|
||||||
|
- name: stop containers
|
||||||
|
run: |
|
||||||
|
docker rm -f $(docker ps -qa) || true
|
||||||
|
docker network rm $(docker network ls -q) || true
|
||||||
- name: docker compose up
|
- name: docker compose up
|
||||||
if: endsWith(github.repository, 'emqx')
|
if: endsWith(github.repository, 'emqx')
|
||||||
env:
|
env:
|
||||||
|
|
|
@ -32,8 +32,18 @@ File format:
|
||||||
- Added a test to prevent a last will testament message to be
|
- Added a test to prevent a last will testament message to be
|
||||||
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
||||||
|
|
||||||
|
- More rigorous checking of flapping to improve stability of the system. [#9045](https://github.com/emqx/emqx/pull/9045)
|
||||||
|
|
||||||
|
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
|
||||||
|
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
|
||||||
|
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
|
||||||
|
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
|
||||||
|
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
|
- Fix HTTP client library to handle SSL socket passive signal. [#9145](https://github.com/emqx/emqx/pull/9145)
|
||||||
|
|
||||||
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
||||||
|
|
||||||
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
|
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
|
||||||
|
@ -48,6 +58,26 @@ File format:
|
||||||
Same `format_status` callback is added here too for `gen_server`s which hold password in
|
Same `format_status` callback is added here too for `gen_server`s which hold password in
|
||||||
their state.
|
their state.
|
||||||
|
|
||||||
|
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
|
||||||
|
- When discarding QoS 2 inflight messages, there were excessive logs
|
||||||
|
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
|
||||||
|
but not the subscribing topic), caused messages to be lost when dispatching.
|
||||||
|
|
||||||
|
- Fix shared subscription group member unsubscribe issue when 'sticky' strategy is used.
|
||||||
|
Prior to this fix, if a previously picked member unsubscribes from the group (without reconnect)
|
||||||
|
the message is still dispatched to it.
|
||||||
|
This issue only occurs when unsubscribe with the session kept.
|
||||||
|
Fixed in [#9119](https://github.com/emqx/emqx/pull/9119)
|
||||||
|
|
||||||
|
- Fix shared subscription 'sticky' strategy when there is no local subscriptions at all.
|
||||||
|
Prior to this change, it may take a few rounds to randomly pick group members until a local subscriber
|
||||||
|
is hit (and then start sticking to it).
|
||||||
|
After this fix, it will start sticking to whichever randomly picked member even when it is a
|
||||||
|
subscriber from another node in the cluster.
|
||||||
|
Fixed in [#9122](https://github.com/emqx/emqx/pull/9122)
|
||||||
|
|
||||||
|
- Fix cannot reset metrics for fallback actions. [#9125](https://github.com/emqx/emqx/pull/9125)
|
||||||
|
|
||||||
## v4.3.20
|
## v4.3.20
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
|
@ -1,8 +1,14 @@
|
||||||
# EMQX 4.4 Changes
|
# EMQX 4.4 Changes
|
||||||
|
|
||||||
|
## v4.4.11
|
||||||
|
|
||||||
|
### Bug fixes (synced from v4.3.22)
|
||||||
|
|
||||||
|
### Enhancements (synced from v4.3.22)
|
||||||
|
|
||||||
## v4.4.10
|
## v4.4.10
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes (synced from v4.3.21)
|
||||||
|
|
||||||
- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981)
|
- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981)
|
||||||
|
|
||||||
|
|
|
@ -1 +1,20 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(APP, emqx_auth_http).
|
-define(APP, emqx_auth_http).
|
||||||
|
|
||||||
|
%% equals to the default value of ehttpc
|
||||||
|
-define(DEFAULT_RETRY_TIMES, 2).
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
-logger_header("[ACL http]").
|
-logger_header("[ACL http]").
|
||||||
|
|
||||||
-import(emqx_auth_http_cli,
|
-import(emqx_auth_http_cli,
|
||||||
[ request/6
|
[ request/7
|
||||||
, feedvar/2
|
, feedvar/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -56,13 +56,15 @@ description() -> "ACL with HTTP API".
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
check_acl_request(#{pool_name := PoolName,
|
check_acl_request(ACLParams =
|
||||||
|
#{pool_name := PoolName,
|
||||||
path := Path,
|
path := Path,
|
||||||
method := Method,
|
method := Method,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
params := Params,
|
params := Params,
|
||||||
timeout := Timeout}, ClientInfo) ->
|
timeout := Timeout}, ClientInfo) ->
|
||||||
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout).
|
Retry = maps:get(retry_times, ACLParams, ?DEFAULT_RETRY_TIMES),
|
||||||
|
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
|
||||||
|
|
||||||
access(subscribe) -> 1;
|
access(subscribe) -> 1;
|
||||||
access(publish) -> 2.
|
access(publish) -> 2.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_auth_http,
|
{application, emqx_auth_http,
|
||||||
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
||||||
{vsn, "4.3.7"}, % strict semver, bump manually!
|
{vsn, "4.3.8"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_auth_http_sup]},
|
{registered, [emqx_auth_http_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
|
@ -1,17 +1,27 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.6",
|
[{"4.3.7",
|
||||||
[ %% There are only changes to the schema file, so we don't need any
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
%% commands here
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.6",
|
||||||
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
|
@ -20,21 +30,29 @@
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4.3.[0-1]">>,
|
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
|
||||||
[{restart_application,emqx_auth_http}]},
|
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.6",
|
[{"4.3.7",
|
||||||
[ %% There are only changes to the schema file, so we don't need any
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
%% commands here
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.6",
|
||||||
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
|
@ -43,6 +61,5 @@
|
||||||
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_auth_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4.3.[0-1]">>,
|
{<<"4.3.[0-1]">>,[{restart_application,emqx_auth_http}]},
|
||||||
[{restart_application,emqx_auth_http}]},
|
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
-logger_header("[Auth http]").
|
-logger_header("[Auth http]").
|
||||||
|
|
||||||
-import(emqx_auth_http_cli,
|
-import(emqx_auth_http_cli,
|
||||||
[ request/6
|
[ request/7
|
||||||
, feedvar/2
|
, feedvar/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -63,24 +63,28 @@ description() -> "Authentication by HTTP API".
|
||||||
%% Requests
|
%% Requests
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
authenticate(#{pool_name := PoolName,
|
authenticate(AuthParams =
|
||||||
|
#{pool_name := PoolName,
|
||||||
path := Path,
|
path := Path,
|
||||||
method := Method,
|
method := Method,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
params := Params,
|
params := Params,
|
||||||
timeout := Timeout}, ClientInfo) ->
|
timeout := Timeout}, ClientInfo) ->
|
||||||
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout).
|
Retry = maps:get(retry_times, AuthParams, ?DEFAULT_RETRY_TIMES),
|
||||||
|
request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry).
|
||||||
|
|
||||||
-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()).
|
-spec(is_superuser(maybe(map()), emqx_types:client()) -> boolean()).
|
||||||
is_superuser(undefined, _ClientInfo) ->
|
is_superuser(undefined, _ClientInfo) ->
|
||||||
false;
|
false;
|
||||||
is_superuser(#{pool_name := PoolName,
|
is_superuser(SuperParams =
|
||||||
|
#{pool_name := PoolName,
|
||||||
path := Path,
|
path := Path,
|
||||||
method := Method,
|
method := Method,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
params := Params,
|
params := Params,
|
||||||
timeout := Timeout}, ClientInfo) ->
|
timeout := Timeout}, ClientInfo) ->
|
||||||
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout) of
|
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
|
||||||
|
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
|
||||||
{ok, 200, _Body} -> true;
|
{ok, 200, _Body} -> true;
|
||||||
{ok, _Code, _Body} -> false;
|
{ok, _Code, _Body} -> false;
|
||||||
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]),
|
{error, Error} -> ?LOG(error, "Request superuser path ~s, error: ~p", [Path, Error]),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-include("emqx_auth_http.hrl").
|
-include("emqx_auth_http.hrl").
|
||||||
|
|
||||||
-export([ request/6
|
-export([ request/6
|
||||||
|
, request/7
|
||||||
, feedvar/2
|
, feedvar/2
|
||||||
, feedvar/3
|
, feedvar/3
|
||||||
]).
|
]).
|
||||||
|
@ -27,18 +28,21 @@
|
||||||
%% HTTP Request
|
%% HTTP Request
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
request(PoolName, get, Path, Headers, Params, Timeout) ->
|
request(PoolName, Method, Path, Headers, Params, Timeout) ->
|
||||||
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
|
request(PoolName, Method, Path, Headers, Params, Timeout, ?DEFAULT_RETRY_TIMES).
|
||||||
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout));
|
|
||||||
|
|
||||||
request(PoolName, post, Path, Headers, Params, Timeout) ->
|
request(PoolName, get, Path, Headers, Params, Timeout, Retry) ->
|
||||||
|
NewPath = Path ++ "?" ++ binary_to_list(cow_qs:qs(bin_kw(Params))),
|
||||||
|
reply(ehttpc:request(PoolName, get, {NewPath, Headers}, Timeout, Retry));
|
||||||
|
|
||||||
|
request(PoolName, post, Path, Headers, Params, Timeout, Retry) ->
|
||||||
Body = case proplists:get_value(<<"content-type">>, Headers) of
|
Body = case proplists:get_value(<<"content-type">>, Headers) of
|
||||||
"application/x-www-form-urlencoded" ->
|
"application/x-www-form-urlencoded" ->
|
||||||
cow_qs:qs(bin_kw(Params));
|
cow_qs:qs(bin_kw(Params));
|
||||||
"application/json" ->
|
"application/json" ->
|
||||||
emqx_json:encode(bin_kw(Params))
|
emqx_json:encode(bin_kw(Params))
|
||||||
end,
|
end,
|
||||||
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout)).
|
reply(ehttpc:request(PoolName, post, {Path, Headers, Body}, Timeout, Retry)).
|
||||||
|
|
||||||
reply({ok, StatusCode, _Headers}) ->
|
reply({ok, StatusCode, _Headers}) ->
|
||||||
{ok, StatusCode, <<>>};
|
{ok, StatusCode, <<>>};
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_rule_engine,
|
{application, emqx_rule_engine,
|
||||||
[{description, "EMQ X Rule Engine"},
|
[{description, "EMQ X Rule Engine"},
|
||||||
{vsn, "4.4.10"}, % strict semver, bump manually!
|
{vsn, "4.4.11"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
|
||||||
{applications, [kernel,stdlib,rulesql,getopt]},
|
{applications, [kernel,stdlib,rulesql,getopt]},
|
||||||
|
|
|
@ -1,19 +1,24 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.4.9",
|
[{"4.4.10",
|
||||||
|
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.4.9",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.8",
|
{"4.4.8",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.4\\.[6-7]">>,
|
{<<"4\\.4\\.[6-7]">>,
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -21,7 +26,8 @@
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.5",
|
{"4.4.5",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -30,7 +36,8 @@
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.4",
|
{"4.4.4",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -39,7 +46,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.3",
|
{"4.4.3",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -97,19 +105,24 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.4.9",
|
[{"4.4.10",
|
||||||
|
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.4.9",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.8",
|
{"4.4.8",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.4\\.[6-7]">>,
|
{<<"4\\.4\\.[6-7]">>,
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -117,7 +130,8 @@
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.5",
|
{"4.4.5",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -126,7 +140,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.4",
|
{"4.4.4",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
@ -135,7 +150,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.3",
|
{"4.4.3",
|
||||||
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -133,16 +133,15 @@ clear_metrics(Id) ->
|
||||||
-spec(reset_metrics(rule_id()) -> ok).
|
-spec(reset_metrics(rule_id()) -> ok).
|
||||||
reset_metrics(Id) ->
|
reset_metrics(Id) ->
|
||||||
reset_speeds(Id),
|
reset_speeds(Id),
|
||||||
reset_metrics(Id, rule_metrics()),
|
do_reset_metrics(Id, rule_metrics()),
|
||||||
case emqx_rule_registry:get_rule(Id) of
|
case emqx_rule_registry:get_rule(Id) of
|
||||||
not_found -> ok;
|
not_found -> ok;
|
||||||
{ok, #rule{actions = Actions}} ->
|
{ok, #rule{actions = Actions}} ->
|
||||||
[ reset_metrics(ActionId, action_metrics())
|
reset_action_metrics(Actions),
|
||||||
|| #action_instance{ id = ActionId} <- Actions],
|
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reset_metrics(Id, Metrics) ->
|
do_reset_metrics(Id, Metrics) ->
|
||||||
case couters_ref(Id) of
|
case couters_ref(Id) of
|
||||||
not_found -> ok;
|
not_found -> ok;
|
||||||
Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
|
Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
|
||||||
|
@ -150,6 +149,12 @@ reset_metrics(Id, Metrics) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
reset_action_metrics(Actions) ->
|
||||||
|
lists:foreach(fun(#action_instance{id = ActionId, fallbacks = FallbackActions}) ->
|
||||||
|
do_reset_metrics(ActionId, action_metrics()),
|
||||||
|
reset_action_metrics(FallbackActions)
|
||||||
|
end, Actions).
|
||||||
|
|
||||||
reset_speeds(Id) ->
|
reset_speeds(Id) ->
|
||||||
gen_server:call(?MODULE, {reset_speeds, Id}).
|
gen_server:call(?MODULE, {reset_speeds, Id}).
|
||||||
|
|
||||||
|
@ -330,6 +335,8 @@ handle_call({create_rule_metrics, Id}, _From,
|
||||||
_ -> RuleSpeeds#{Id => #rule_speed{}}
|
_ -> RuleSpeeds#{Id => #rule_speed{}}
|
||||||
end}};
|
end}};
|
||||||
|
|
||||||
|
handle_call({reset_speeds, _Id}, _From, State = #state{rule_speeds = undefined}) ->
|
||||||
|
{reply, ok, State};
|
||||||
handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) ->
|
handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) ->
|
||||||
{reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}};
|
{reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}};
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ groups() ->
|
||||||
t_unregister_provider,
|
t_unregister_provider,
|
||||||
t_create_rule,
|
t_create_rule,
|
||||||
t_reset_metrics,
|
t_reset_metrics,
|
||||||
|
t_reset_metrics_fallbacks,
|
||||||
t_create_resource
|
t_create_resource
|
||||||
]},
|
]},
|
||||||
{actions, [],
|
{actions, [],
|
||||||
|
@ -381,18 +382,14 @@ t_inspect_action(_Config) ->
|
||||||
|
|
||||||
t_reset_metrics(_Config) ->
|
t_reset_metrics(_Config) ->
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
|
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0}]}} =
|
||||||
#{type => built_in,
|
emqx_rule_engine:create_rule(
|
||||||
config => #{},
|
#{rawsql => "select clientid as c, username as u "
|
||||||
description => <<"debug resource">>}),
|
"from \"t1\" ",
|
||||||
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
|
actions => [#{name => 'inspect', args => #{a=>1, b=>2}}],
|
||||||
#{rawsql => "select clientid as c, username as u "
|
type => built_in,
|
||||||
"from \"t1\" ",
|
description => <<"Inspect rule">>
|
||||||
actions => [#{name => 'inspect',
|
}),
|
||||||
args => #{'$resource' => ResId, a=>1, b=>2}}],
|
|
||||||
type => built_in,
|
|
||||||
description => <<"Inspect rule">>
|
|
||||||
}),
|
|
||||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
[ begin
|
[ begin
|
||||||
|
@ -400,16 +397,68 @@ t_reset_metrics(_Config) ->
|
||||||
timer:sleep(100)
|
timer:sleep(100)
|
||||||
end
|
end
|
||||||
|| _ <- lists:seq(1,10)],
|
|| _ <- lists:seq(1,10)],
|
||||||
|
?assertMatch(#{exception := 0, failed := 0,
|
||||||
|
matched := 10, no_result := 0, passed := 10},
|
||||||
|
emqx_rule_metrics:get_rule_metrics(Id)),
|
||||||
|
?assertMatch(#{failed := 0, success := 10, taken := 10},
|
||||||
|
emqx_rule_metrics:get_action_metrics(ActId0)),
|
||||||
emqx_rule_metrics:reset_metrics(Id),
|
emqx_rule_metrics:reset_metrics(Id),
|
||||||
?assertEqual(#{exception => 0,failed => 0,
|
?assertEqual(#{exception => 0,failed => 0,
|
||||||
matched => 0,no_result => 0,passed => 0,
|
matched => 0,no_result => 0,passed => 0,
|
||||||
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
|
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
|
||||||
emqx_rule_metrics:get_rule_metrics(Id)),
|
emqx_rule_metrics:get_rule_metrics(Id)),
|
||||||
?assertEqual(#{failed => 0,success => 0,taken => 0},
|
?assertEqual(#{failed => 0, success => 0, taken => 0},
|
||||||
emqx_rule_metrics:get_action_metrics(ResId)),
|
emqx_rule_metrics:get_action_metrics(ActId0)),
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
emqx_rule_registry:remove_rule(Id),
|
emqx_rule_registry:remove_rule(Id),
|
||||||
emqx_rule_registry:remove_resource(ResId),
|
ok.
|
||||||
|
|
||||||
|
t_reset_metrics_fallbacks(_Config) ->
|
||||||
|
ok = emqx_rule_engine:load_providers(),
|
||||||
|
ok = emqx_rule_registry:add_action(
|
||||||
|
#action{name = 'crash_action', app = ?APP,
|
||||||
|
module = ?MODULE, on_create = crash_action,
|
||||||
|
types=[], params_spec = #{},
|
||||||
|
title = #{en => <<"Crash Action">>},
|
||||||
|
description = #{en => <<"This action will always fail!">>}}),
|
||||||
|
{ok, #rule{id = Id, actions = [#action_instance{id = ActId0, fallbacks = [
|
||||||
|
#action_instance{id = ActId1},
|
||||||
|
#action_instance{id = ActId2}
|
||||||
|
]}]}} =
|
||||||
|
emqx_rule_engine:create_rule(
|
||||||
|
#{rawsql => "select clientid as c, username as u "
|
||||||
|
"from \"t1\" ",
|
||||||
|
actions => [#{name => 'crash_action', args => #{a=>1, b=>2}, fallbacks => [
|
||||||
|
#{name => 'inspect', args => #{}, fallbacks => []},
|
||||||
|
#{name => 'inspect', args => #{}, fallbacks => []}
|
||||||
|
]}],
|
||||||
|
type => built_in,
|
||||||
|
description => <<"Inspect rule">>
|
||||||
|
}),
|
||||||
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
[ begin
|
||||||
|
emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0),
|
||||||
|
timer:sleep(100)
|
||||||
|
end
|
||||||
|
|| _ <- lists:seq(1,10)],
|
||||||
|
?assertMatch(#{exception := 0, failed := 0,
|
||||||
|
matched := 10, no_result := 0, passed := 10},
|
||||||
|
emqx_rule_metrics:get_rule_metrics(Id)),
|
||||||
|
[?assertMatch(#{failed := 10, success := 0, taken := 10},
|
||||||
|
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0]],
|
||||||
|
[?assertMatch(#{failed := 0, success := 10, taken := 10},
|
||||||
|
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ ActId1, ActId2]],
|
||||||
|
emqx_rule_metrics:reset_metrics(Id),
|
||||||
|
?assertEqual(#{exception => 0,failed => 0,
|
||||||
|
matched => 0,no_result => 0,passed => 0,
|
||||||
|
speed => 0.0,speed_last5m => 0.0,speed_max => 0.0},
|
||||||
|
emqx_rule_metrics:get_rule_metrics(Id)),
|
||||||
|
[?assertEqual(#{failed => 0, success => 0, taken => 0},
|
||||||
|
emqx_rule_metrics:get_action_metrics(AId)) || AId <- [ActId0, ActId1, ActId2]],
|
||||||
|
emqtt:stop(Client),
|
||||||
|
emqx_rule_registry:remove_rule(Id),
|
||||||
|
ok = emqx_rule_registry:remove_action('crash_action'),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_republish_action(_Config) ->
|
t_republish_action(_Config) ->
|
||||||
|
|
2
bin/emqx
2
bin/emqx
|
@ -525,7 +525,7 @@ esac
|
||||||
|
|
||||||
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
|
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
|
||||||
# for non-boot commands, inspect vm.<time>.args for node name
|
# for non-boot commands, inspect vm.<time>.args for node name
|
||||||
# shellcheck disable=SC2012,SC2086
|
# shellcheck disable=SC2012
|
||||||
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
|
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
|
||||||
if [ -z "$LATEST_VM_ARGS_FILE" ]; then
|
if [ -z "$LATEST_VM_ARGS_FILE" ]; then
|
||||||
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"
|
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-ifndef(EMQX_ENTERPRISE).
|
-ifndef(EMQX_ENTERPRISE).
|
||||||
|
|
||||||
-define(EMQX_RELEASE, {opensource, "4.4.10-alpha.1"}).
|
-define(EMQX_RELEASE, {opensource, "4.4.10"}).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_dashboard,
|
{application, emqx_dashboard,
|
||||||
[{description, "EMQX Web Dashboard"},
|
[{description, "EMQX Web Dashboard"},
|
||||||
{vsn, "4.4.9"}, % strict semver, bump manually!
|
{vsn, "4.4.10"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_dashboard_sup]},
|
{registered, [emqx_dashboard_sup]},
|
||||||
{applications, [kernel,stdlib,mnesia,minirest]},
|
{applications, [kernel,stdlib,mnesia,minirest]},
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {redbug, "2.0.7"}
|
, {redbug, "2.0.7"}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
||||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.8"}}}
|
||||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
|
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
|
|
|
@ -21,6 +21,24 @@ parse_semver() {
|
||||||
echo "$1" | tr '.|-' ' '
|
echo "$1" | tr '.|-' ' '
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exempt_bump() {
|
||||||
|
local app="$1"
|
||||||
|
local from="$2"
|
||||||
|
local to="$3"
|
||||||
|
|
||||||
|
case "$app,$from,$to" in
|
||||||
|
"lib-ee/emqx_conf,4.3.9,4.4.0")
|
||||||
|
true
|
||||||
|
;;
|
||||||
|
"lib-ee/emqx_license,4.3.7,4.4.0")
|
||||||
|
true
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
false
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
}
|
||||||
|
|
||||||
check_apps() {
|
check_apps() {
|
||||||
while read -r app; do
|
while read -r app; do
|
||||||
if [ "$app" != "emqx" ]; then
|
if [ "$app" != "emqx" ]; then
|
||||||
|
@ -42,6 +60,7 @@ check_apps() {
|
||||||
if [ "$old_app_version" = "$now_app_version" ]; then
|
if [ "$old_app_version" = "$now_app_version" ]; then
|
||||||
changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \
|
changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \
|
||||||
-- "$app_path/src" \
|
-- "$app_path/src" \
|
||||||
|
-- "$app_path/include" \
|
||||||
-- ":(exclude)$app_path/src/*.appup.src" \
|
-- ":(exclude)$app_path/src/*.appup.src" \
|
||||||
-- "$app_path/priv" \
|
-- "$app_path/priv" \
|
||||||
-- "$app_path/c_src" | wc -l ) "
|
-- "$app_path/c_src" | wc -l ) "
|
||||||
|
@ -49,12 +68,12 @@ check_apps() {
|
||||||
echo "$src_file needs a vsn bump (old=$old_app_version)"
|
echo "$src_file needs a vsn bump (old=$old_app_version)"
|
||||||
echo "changed: $changed_lines"
|
echo "changed: $changed_lines"
|
||||||
bad_app_count=$(( bad_app_count + 1))
|
bad_app_count=$(( bad_app_count + 1))
|
||||||
elif [ "$app" = 'emqx_dashboard' ]; then
|
elif [[ ${app_path} = *emqx_dashboard* ]]; then
|
||||||
## emqx_dashboard is ensured to be upgraded after all other plugins
|
## emqx_dashboard is ensured to be upgraded after all other plugins
|
||||||
## at the end of its appup instructions, there is the final instruction
|
## at the end of its appup instructions, there is the final instruction
|
||||||
## {apply, {emqx_plugins, load, []}
|
## {apply, {emqx_plugins, load, []}
|
||||||
## since we don't know which plugins are stopped during the upgrade
|
## since we don't know which plugins are stopped during the upgrade
|
||||||
## for safety, we just force a dashboard version bump for each and every release
|
## for safty, we just force a dashboard version bump for each and every release
|
||||||
## even if there is nothing changed in the app
|
## even if there is nothing changed in the app
|
||||||
echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade"
|
echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade"
|
||||||
bad_app_count=$(( bad_app_count + 1))
|
bad_app_count=$(( bad_app_count + 1))
|
||||||
|
@ -69,8 +88,12 @@ check_apps() {
|
||||||
[ "$(( old_app_version_semver[2] + 1 ))" = "${now_app_version_semver[2]}" ]; then
|
[ "$(( old_app_version_semver[2] + 1 ))" = "${now_app_version_semver[2]}" ]; then
|
||||||
true
|
true
|
||||||
else
|
else
|
||||||
|
if exempt_bump "$app" "$old_app_version" "$now_app_version"; then
|
||||||
|
true
|
||||||
|
else
|
||||||
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"
|
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"
|
||||||
bad_app_count=$(( bad_app_count + 1))
|
bad_app_count=$(( bad_app_count + 1))
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
done < <(./scripts/find-apps.sh)
|
done < <(./scripts/find-apps.sh)
|
||||||
|
|
|
@ -8,13 +8,13 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
|
||||||
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
|
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
|
||||||
case "${PKG_VSN}" in
|
case "${PKG_VSN}" in
|
||||||
4.3*)
|
4.3*)
|
||||||
EMQX_CE_DASHBOARD_VERSION='v4.3.10'
|
EMQX_CE_DASHBOARD_VERSION='v4.3.11'
|
||||||
EMQX_EE_DASHBOARD_VERSION='v4.3.24'
|
EMQX_EE_DASHBOARD_VERSION='v4.3.26'
|
||||||
;;
|
;;
|
||||||
4.4*)
|
4.4*)
|
||||||
# keep the above 4.3 untouched, otherwise conflicts!
|
# keep the above 4.3 untouched, otherwise conflicts!
|
||||||
EMQX_CE_DASHBOARD_VERSION='v4.4.5'
|
EMQX_CE_DASHBOARD_VERSION='v4.4.6'
|
||||||
EMQX_EE_DASHBOARD_VERSION='v4.4.15'
|
EMQX_EE_DASHBOARD_VERSION='v4.4.17'
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
echo "Unsupported version $PKG_VSN" >&2
|
echo "Unsupported version $PKG_VSN" >&2
|
||||||
|
|
|
@ -230,6 +230,16 @@ if [ "$HAS_RELUP_DB" = 'yes' ]; then
|
||||||
./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS"
|
./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
## Run some additional checks (e.g. some for enterprise edition only)
|
||||||
|
CHECKS_DIR="./scripts/rel/checks"
|
||||||
|
if [ -d "${CHECKS_DIR}" ]; then
|
||||||
|
CHECKS="$(find "${CHECKS_DIR}" -name "*.sh" -print0 2>/dev/null | xargs -0)"
|
||||||
|
for c in $CHECKS; do
|
||||||
|
logmsg "Executing $c"
|
||||||
|
$c
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
|
||||||
if [ "$DRYRUN" = 'yes' ]; then
|
if [ "$DRYRUN" = 'yes' ]; then
|
||||||
logmsg "Release tag is ready to be created with command: git tag $TAG"
|
logmsg "Release tag is ready to be created with command: git tag $TAG"
|
||||||
else
|
else
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
%% the emqx `release' version, which in turn is comprised of several
|
%% the emqx `release' version, which in turn is comprised of several
|
||||||
%% apps, one of which is this. See `emqx_release.hrl' for more
|
%% apps, one of which is this. See `emqx_release.hrl' for more
|
||||||
%% info.
|
%% info.
|
||||||
{vsn, "4.4.10"}, % strict semver, bump manually!
|
{vsn, "4.4.11"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [ kernel
|
{applications, [ kernel
|
||||||
|
|
|
@ -1,8 +1,12 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.4.9",
|
[{"4.4.10",
|
||||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.4.9",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_secret},
|
{add_module,emqx_secret},
|
||||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
@ -270,8 +274,12 @@
|
||||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.4.9",
|
[{"4.4.10",
|
||||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.4.9",
|
||||||
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -227,6 +227,7 @@ shutdown() ->
|
||||||
shutdown(normal).
|
shutdown(normal).
|
||||||
|
|
||||||
shutdown(Reason) ->
|
shutdown(Reason) ->
|
||||||
|
ok = emqx_misc:maybe_mute_rpc_log(),
|
||||||
?LOG(critical, "emqx shutdown for ~s", [Reason]),
|
?LOG(critical, "emqx shutdown for ~s", [Reason]),
|
||||||
on_shutdown(Reason),
|
on_shutdown(Reason),
|
||||||
_ = emqx_plugins:unload(),
|
_ = emqx_plugins:unload(),
|
||||||
|
|
|
@ -312,6 +312,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
||||||
fun enrich_client/2,
|
fun enrich_client/2,
|
||||||
fun set_log_meta/2,
|
fun set_log_meta/2,
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
|
fun count_flapping_event/2,
|
||||||
fun auth_connect/2
|
fun auth_connect/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||||
|
@ -1060,11 +1061,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||||
shutdown(Reason, Channel);
|
shutdown(Reason, Channel);
|
||||||
|
|
||||||
handle_info({sock_closed, Reason}, Channel =
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected}) ->
|
||||||
#channel{conn_state = connected,
|
|
||||||
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
||||||
emqx_zone:enable_flapping_detect(Zone)
|
|
||||||
andalso emqx_flapping:detect(ClientInfo),
|
|
||||||
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
||||||
case maybe_shutdown(Reason, Channel1) of
|
case maybe_shutdown(Reason, Channel1) of
|
||||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||||
|
@ -1373,6 +1370,13 @@ auth_connect(#mqtt_packet_connect{password = Password},
|
||||||
{error, emqx_reason_codes:connack_error(Reason)}
|
{error, emqx_reason_codes:connack_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Flapping
|
||||||
|
|
||||||
|
count_flapping_event(_ConnPkt, Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||||
|
_ = emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
|
||||||
|
{ok, Channel}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enhanced Authentication
|
%% Enhanced Authentication
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
-export([ merge_opts/2
|
-export([ merge_opts/2
|
||||||
, maybe_apply/2
|
, maybe_apply/2
|
||||||
|
, maybe_mute_rpc_log/0
|
||||||
, compose/1
|
, compose/1
|
||||||
, compose/2
|
, compose/2
|
||||||
, run_fold/3
|
, run_fold/3
|
||||||
|
@ -446,6 +447,27 @@ do_parallel_map(Fun, List) ->
|
||||||
PidList
|
PidList
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% @doc Call this function to avoid logs printed to RPC caller node.
|
||||||
|
-spec maybe_mute_rpc_log() -> ok.
|
||||||
|
maybe_mute_rpc_log() ->
|
||||||
|
GlNode = node(group_leader()),
|
||||||
|
maybe_mute_rpc_log(GlNode).
|
||||||
|
|
||||||
|
maybe_mute_rpc_log(Node) when Node =:= node() ->
|
||||||
|
%% do nothing, this is a local call
|
||||||
|
ok;
|
||||||
|
maybe_mute_rpc_log(Node) ->
|
||||||
|
case atom_to_list(Node) of
|
||||||
|
"remsh" ++ _ ->
|
||||||
|
%% this is either an upgrade script or nodetool
|
||||||
|
%% do nothing, the log may go to the 'emqx' command line console
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
%% otherwise set group leader to local node
|
||||||
|
_ = group_leader(whereis(init), self()),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
|
|
@ -683,26 +683,35 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
||||||
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
||||||
InflightList = emqx_inflight:to_list(Inflight),
|
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
|
||||||
lists:foreach(fun
|
F = fun({_, {Msg, _Ts}}) ->
|
||||||
%% Only QoS1 messages get redispatched, because QoS2 messages
|
case Msg of
|
||||||
%% must be sent to the same client, once they're in flight
|
#message{qos = ?QOS_1} ->
|
||||||
({_, {#message{qos = ?QOS_2} = Msg, _}}) ->
|
%% For QoS 2, here is what the spec says:
|
||||||
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]);
|
%% If the Client's Session terminates before the Client reconnects,
|
||||||
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) ->
|
%% the Server MUST NOT send the Application Message to any other
|
||||||
case emqx_shared_sub:get_group(Msg) of
|
%% subscribed Client [MQTT-4.8.2-5].
|
||||||
{ok, Group} ->
|
{true, Msg};
|
||||||
%% Note that dispatch is called with self() in failed subs
|
_ ->
|
||||||
%% This is done to avoid dispatching back to caller
|
%% QoS 2, after pubrec is received
|
||||||
Delivery = #delivery{sender = self(), message = Msg},
|
%% the inflight record is updated to an atom
|
||||||
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
|
false
|
||||||
_ ->
|
end
|
||||||
false
|
end,
|
||||||
end;
|
InflightList = lists:filtermap(F, AllInflights),
|
||||||
(_) ->
|
MqList = mqueue_to_list(Q, []),
|
||||||
ok
|
emqx_shared_sub:redispatch(InflightList ++ MqList).
|
||||||
end, InflightList).
|
|
||||||
|
%% convert mqueue to a list
|
||||||
|
%% the messages at the head of the list is to be dispatched before the tail
|
||||||
|
mqueue_to_list(Q, Acc) ->
|
||||||
|
case emqx_mqueue:out(Q) of
|
||||||
|
{empty, _Q} ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
{{value, Msg}, Q1} ->
|
||||||
|
mqueue_to_list(Q1, [Msg | Acc])
|
||||||
|
end.
|
||||||
|
|
||||||
-compile({inline, [run_hook/2]}).
|
-compile({inline, [run_hook/2]}).
|
||||||
run_hook(Name, Args) ->
|
run_hook(Name, Args) ->
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ dispatch/3
|
-export([ dispatch/3
|
||||||
, dispatch_to_non_self/3
|
, redispatch/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ maybe_ack/1
|
-export([ maybe_ack/1
|
||||||
|
@ -47,7 +47,6 @@
|
||||||
, nack_no_connection/1
|
, nack_no_connection/1
|
||||||
, is_ack_required/1
|
, is_ack_required/1
|
||||||
, is_retry_dispatch/1
|
, is_retry_dispatch/1
|
||||||
, get_group/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
|
@ -84,6 +83,9 @@
|
||||||
-define(ACK, shared_sub_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
|
@ -134,11 +136,12 @@ dispatch(Group, Topic, Delivery) ->
|
||||||
Strategy = strategy(Group),
|
Strategy = strategy(Group),
|
||||||
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
||||||
|
|
||||||
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
|
||||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
#message{from = ClientId, topic = SourceTopic} = Msg0,
|
||||||
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||||
false -> {error, no_subscribers};
|
false -> {error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
|
Msg = with_redispatch_to(Msg0, Group, Topic),
|
||||||
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
||||||
ok -> {ok, 1};
|
ok -> {ok, 1};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -162,7 +165,7 @@ ack_enabled() ->
|
||||||
emqx:get_env(shared_dispatch_ack_enabled, false).
|
emqx:get_env(shared_dispatch_ack_enabled, false).
|
||||||
|
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% dispatch without ack, deadlock otherwise
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
%% return either 'ok' (when everything is fine) or 'error'
|
%% return either 'ok' (when everything is fine) or 'error'
|
||||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
|
@ -176,6 +179,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg})
|
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg;
|
||||||
|
with_redispatch_to(Msg, Group, Topic) ->
|
||||||
|
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
|
||||||
|
|
||||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
%% For QoS 1/2 message, expect an ack
|
%% For QoS 1/2 message, expect an ack
|
||||||
Ref = erlang:monitor(process, SubPid),
|
Ref = erlang:monitor(process, SubPid),
|
||||||
|
@ -228,13 +235,22 @@ without_group_ack(Msg) ->
|
||||||
get_group_ack(Msg) ->
|
get_group_ack(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
|
||||||
get_group(Msg) ->
|
is_redispatch_needed(#message{} = Msg) ->
|
||||||
case get_group_ack(Msg) of
|
case get_redispatch_to(Msg) of
|
||||||
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
|
?REDISPATCH_TO(_, _) ->
|
||||||
_ -> error
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
||||||
|
%% `false` is returned if the message is not a shared dispatch.
|
||||||
|
%% or when it's a QoS 0 message.
|
||||||
|
-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false).
|
||||||
|
get_redispatch_to(Msg) ->
|
||||||
|
emqx_message:get_header(redispatch_to, Msg, false).
|
||||||
|
|
||||||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||||
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||||
|
|
||||||
|
@ -245,6 +261,26 @@ is_retry_dispatch(Msg) ->
|
||||||
_ -> false
|
_ -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Redispatch shared deliveries to other members in the group.
|
||||||
|
redispatch(Messages0) ->
|
||||||
|
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
|
||||||
|
case length(Messages) of
|
||||||
|
L when L > 0 ->
|
||||||
|
?LOG(info, "Redispatching ~p shared subscription message(s)", [L]),
|
||||||
|
lists:foreach(fun redispatch_shared_message/1, Messages);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
redispatch_shared_message(#message{} = Msg) ->
|
||||||
|
%% As long as it's still a #message{} record in inflight,
|
||||||
|
%% we should try to re-dispatch
|
||||||
|
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
|
||||||
|
%% Note that dispatch is called with self() in failed subs
|
||||||
|
%% This is done to avoid dispatching back to caller
|
||||||
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
|
dispatch_to_non_self(Group, Topic, Delivery).
|
||||||
|
|
||||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||||
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
|
@ -301,7 +337,8 @@ fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
|
||||||
|
|
||||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
||||||
case is_active_sub(Sub0, FailedSubs) of
|
All = subscribers(Group, Topic),
|
||||||
|
case is_active_sub(Sub0, FailedSubs, All) of
|
||||||
true ->
|
true ->
|
||||||
%% the old subscriber is still alive
|
%% the old subscriber is still alive
|
||||||
%% keep using it for sticky strategy
|
%% keep using it for sticky strategy
|
||||||
|
@ -419,6 +456,7 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
||||||
|
ok = maybe_insert_alive_tab(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||||
|
@ -471,8 +509,10 @@ update_stats(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
||||||
is_active_sub(Pid, FailedSubs) ->
|
is_active_sub(Pid, FailedSubs, All) ->
|
||||||
not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid).
|
lists:member(Pid, All) andalso
|
||||||
|
(not maps:is_key(Pid, FailedSubs)) andalso
|
||||||
|
is_alive_sub(Pid).
|
||||||
|
|
||||||
%% erlang:is_process_alive/1 does not work with remote pid.
|
%% erlang:is_process_alive/1 does not work with remote pid.
|
||||||
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
||||||
|
|
|
@ -35,11 +35,7 @@ init_per_suite(Config) ->
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
|
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
|
||||||
%% Access Control Meck
|
|
||||||
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
||||||
ok = meck:expect(emqx_access_control, authenticate,
|
|
||||||
fun(_) -> {ok, #{auth_result => success}} end),
|
|
||||||
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
|
|
||||||
%% Broker Meck
|
%% Broker Meck
|
||||||
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
||||||
%% Hooks Meck
|
%% Hooks Meck
|
||||||
|
@ -55,8 +51,7 @@ init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
meck:unload([emqx_access_control,
|
meck:unload([emqx_metrics,
|
||||||
emqx_metrics,
|
|
||||||
emqx_session,
|
emqx_session,
|
||||||
emqx_broker,
|
emqx_broker,
|
||||||
emqx_hooks,
|
emqx_hooks,
|
||||||
|
@ -65,10 +60,16 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
meck:new(emqx_zone, [passthrough, no_history, no_link]),
|
meck:new(emqx_zone, [passthrough, no_history, no_link]),
|
||||||
|
%% Access Control Meck
|
||||||
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_access_control, authenticate,
|
||||||
|
fun(_) -> {ok, #{auth_result => success}} end),
|
||||||
|
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
meck:unload([emqx_zone]),
|
meck:unload([emqx_zone]),
|
||||||
|
meck:unload([emqx_access_control]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -855,6 +856,30 @@ t_ws_cookie_init(_) ->
|
||||||
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
|
Channel = emqx_channel:init(ConnInfo, [{zone, zone}]),
|
||||||
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases for other mechnisms
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_flapping_detect(_) ->
|
||||||
|
Parent = self(),
|
||||||
|
ok = meck:expect(emqx_cm, open_session,
|
||||||
|
fun(true, _ClientInfo, _ConnInfo) ->
|
||||||
|
{ok, #{session => session(), present => false}}
|
||||||
|
end),
|
||||||
|
ok = meck:expect(emqx_access_control, authenticate, fun(_) -> {error, not_authorized} end),
|
||||||
|
ok = meck:new(emqx_flapping, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_flapping, detect, fun(_) -> Parent ! flapping_detect end),
|
||||||
|
ok = meck:expect(emqx_zone, enable_flapping_detect, fun(_) -> true end),
|
||||||
|
IdleChannel = channel(#{conn_state => idle}),
|
||||||
|
{shutdown, not_authorized, _ConnAck, _Channel} =
|
||||||
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
||||||
|
receive
|
||||||
|
flapping_detect -> ok
|
||||||
|
after 2000 ->
|
||||||
|
?assert(false, "Flapping detect should be exected in connecting progress")
|
||||||
|
end,
|
||||||
|
meck:unload([emqx_flapping]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -72,4 +72,4 @@ t_expired_detecting(_) ->
|
||||||
(_) -> false end, ets:tab2list(emqx_flapping))),
|
(_) -> false end, ets:tab2list(emqx_flapping))),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
|
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
|
||||||
(_) -> true end, ets:tab2list(emqx_flapping))).
|
(_) -> true end, ets:tab2list(emqx_flapping))).
|
||||||
|
|
|
@ -40,7 +40,9 @@ start_slave(Name, Opts) ->
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
ok;
|
ok;
|
||||||
{error, started_not_connected, _} ->
|
{error, started_not_connected, _} ->
|
||||||
ok
|
ok;
|
||||||
|
Other ->
|
||||||
|
throw(Other)
|
||||||
end,
|
end,
|
||||||
pong = net_adm:ping(Node),
|
pong = net_adm:ping(Node),
|
||||||
setup_node(Node, Opts),
|
setup_node(Node, Opts),
|
||||||
|
@ -92,7 +94,11 @@ setup_node(Node, #{} = Opts) ->
|
||||||
end,
|
end,
|
||||||
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
|
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
|
||||||
|
|
||||||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
%% apps need to be loaded before starting for ekka to find and create mnesia tables
|
||||||
|
LoadApps = lists:usort([gen_rcp, emqx] ++ ?SLAVE_START_APPS),
|
||||||
|
lists:foreach(fun(App) ->
|
||||||
|
rpc:call(Node, application, load, [App])
|
||||||
|
end, LoadApps),
|
||||||
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
|
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
|
||||||
|
|
||||||
case maps:get(no_join, Opts, false) of
|
case maps:get(no_join, Opts, false) of
|
||||||
|
|
|
@ -25,13 +25,24 @@
|
||||||
|
|
||||||
-define(SUITE, ?MODULE).
|
-define(SUITE, ?MODULE).
|
||||||
|
|
||||||
-define(wait(For, Timeout),
|
|
||||||
emqx_ct_helpers:wait_for(
|
|
||||||
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
|
||||||
|
|
||||||
-define(ack, shared_sub_ack).
|
-define(ack, shared_sub_ack).
|
||||||
-define(no_ack, no_ack).
|
-define(no_ack, no_ack).
|
||||||
|
|
||||||
|
-define(WAIT(TIMEOUT, PATTERN, Res),
|
||||||
|
(fun() ->
|
||||||
|
receive
|
||||||
|
PATTERN ->
|
||||||
|
Res;
|
||||||
|
Other ->
|
||||||
|
ct:fail(#{expected => ??PATTERN,
|
||||||
|
got => Other
|
||||||
|
})
|
||||||
|
after
|
||||||
|
TIMEOUT ->
|
||||||
|
ct:fail({timeout, ??PATTERN})
|
||||||
|
end
|
||||||
|
end)()).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?SUITE).
|
all() -> emqx_ct:all(?SUITE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -151,40 +162,7 @@ t_no_connection_nack(Config) when is_list(Config) ->
|
||||||
SendF(1),
|
SendF(1),
|
||||||
ct:sleep(200),
|
ct:sleep(200),
|
||||||
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
||||||
|
|
||||||
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
||||||
%% Now kill the connection, expect all following messages to be delivered to the other
|
|
||||||
%% subscriber.
|
|
||||||
%emqx_mock_client:stop(ConnPid),
|
|
||||||
%% sleep then make synced calls to session processes to ensure that
|
|
||||||
%% the connection pid's 'EXIT' message is propagated to the session process
|
|
||||||
%% also to be sure sessions are still alive
|
|
||||||
% timer:sleep(2),
|
|
||||||
% _ = emqx_session:info(SPid1),
|
|
||||||
% _ = emqx_session:info(SPid2),
|
|
||||||
% %% Now we know what is the other still alive connection
|
|
||||||
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
|
|
||||||
% %% Send some more messages
|
|
||||||
% PacketIdList = lists:seq(2, 10),
|
|
||||||
% lists:foreach(fun(Id) ->
|
|
||||||
% SendF(Id),
|
|
||||||
% ?wait(Received(Id, TheOtherConnPid), 1000)
|
|
||||||
% end, PacketIdList),
|
|
||||||
% %% Now close the 2nd (last connection)
|
|
||||||
% emqx_mock_client:stop(TheOtherConnPid),
|
|
||||||
% timer:sleep(2),
|
|
||||||
% %% both sessions should have conn_pid = undefined
|
|
||||||
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
|
|
||||||
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
|
|
||||||
% %% send more messages, but all should be queued in session state
|
|
||||||
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
|
|
||||||
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
|
|
||||||
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
|
|
||||||
% ?assertEqual(length(PacketIdList), L1 + L2),
|
|
||||||
% %% clean up
|
|
||||||
% emqx_mock_client:close_session(PubConnPid),
|
|
||||||
% emqx_sm:close_session(SPid1),
|
|
||||||
% emqx_sm:close_session(SPid2),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_random(Config) when is_list(Config) ->
|
t_random(Config) when is_list(Config) ->
|
||||||
|
@ -443,8 +421,14 @@ t_local_fallback(Config) when is_list(Config) ->
|
||||||
|
|
||||||
%% This one tests that broker tries to select another shared subscriber
|
%% This one tests that broker tries to select another shared subscriber
|
||||||
%% If the first one doesn't return an ACK
|
%% If the first one doesn't return an ACK
|
||||||
t_redispatch(Config) when is_list(Config) ->
|
t_redispatch_with_ack(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky, true),
|
test_redispatch(Config, true).
|
||||||
|
|
||||||
|
t_redispatch_no_ack(Config) when is_list(Config) ->
|
||||||
|
test_redispatch(Config, false).
|
||||||
|
|
||||||
|
test_redispatch(_Config, AckEnabled) ->
|
||||||
|
ok = ensure_config(sticky, AckEnabled),
|
||||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||||
|
|
||||||
Group = <<"group1">>,
|
Group = <<"group1">>,
|
||||||
|
@ -474,15 +458,59 @@ t_redispatch(Config) when is_list(Config) ->
|
||||||
emqtt:stop(UsedSubPid2),
|
emqtt:stop(UsedSubPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_redispatch_wildcard_with_ack(Config) when is_list(Config)->
|
||||||
|
redispatch_wildcard(Config, true).
|
||||||
|
|
||||||
|
t_redispatch_wildcard_no_ack(Config) when is_list(Config) ->
|
||||||
|
redispatch_wildcard(Config, false).
|
||||||
|
|
||||||
|
%% This one tests that broker tries to redispatch to another member in the group
|
||||||
|
%% if the first one disconnected before acking (auto_ack set to false)
|
||||||
|
redispatch_wildcard(_Config, AckEnabled) ->
|
||||||
|
ok = ensure_config(sticky, AckEnabled),
|
||||||
|
|
||||||
|
Group = <<"group1">>,
|
||||||
|
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
||||||
|
|
||||||
|
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
||||||
|
|
||||||
|
emqx:publish(Message),
|
||||||
|
|
||||||
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||||
|
ok = emqtt:stop(UsedSubPid1),
|
||||||
|
|
||||||
|
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
|
||||||
|
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
|
||||||
|
|
||||||
|
{true, UsedSubPid2} = Res,
|
||||||
|
emqtt:stop(UsedSubPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_dispatch_when_inflights_are_full({init, Config}) ->
|
||||||
|
%% make sure broker does not push more than one inflight
|
||||||
|
meck:new(emqx_zone, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
||||||
|
Config;
|
||||||
|
t_dispatch_when_inflights_are_full({'end', _Config}) ->
|
||||||
|
meck:unload(emqx_zone);
|
||||||
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, true),
|
ok = ensure_config(round_robin, _AckEnabled = true),
|
||||||
Topic = <<"foo/bar">>,
|
Topic = <<"foo/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
%% Note that max_inflight is 1
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]),
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
|
|
||||||
{ok, _} = emqtt:connect(ConnPid1),
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
{ok, _} = emqtt:connect(ConnPid2),
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
@ -505,8 +533,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||||
|
|
||||||
%% Now kill any client
|
%% Now kill any client
|
||||||
erlang:exit(ConnPid1, normal),
|
ok = kill_process(ConnPid1),
|
||||||
ct:sleep(100),
|
|
||||||
|
|
||||||
%% And try to send the message
|
%% And try to send the message
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||||
|
@ -521,10 +548,137 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
emqtt:stop(ConnPid2),
|
emqtt:stop(ConnPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% No ack, QoS 2 subscriptions,
|
||||||
|
%% client1 receives one message, send pubrec, then suspend
|
||||||
|
%% client2 acts normal (auto_ack=true)
|
||||||
|
%% Expected behaviour:
|
||||||
|
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
|
||||||
|
t_dispatch_qos2({init, Config}) when is_list(Config) ->
|
||||||
|
meck:new(emqx_zone, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
||||||
|
Config;
|
||||||
|
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
|
||||||
|
meck:unload(emqx_zone);
|
||||||
|
t_dispatch_qos2(Config) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
|
||||||
|
|
||||||
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
||||||
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
||||||
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
||||||
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
ok = sys:suspend(ConnPid1),
|
||||||
|
|
||||||
|
%% One message is inflight
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
||||||
|
|
||||||
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||||
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec2 > MsgRec1),
|
||||||
|
|
||||||
|
sys:resume(ConnPid1),
|
||||||
|
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
||||||
|
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
||||||
|
%% the 4th message yet since max_inflight is 1.
|
||||||
|
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
|
||||||
|
ct:sleep(100),
|
||||||
|
%% no message expected
|
||||||
|
?assertEqual([], collect_msgs(0)),
|
||||||
|
%% now kill client 1
|
||||||
|
kill_process(ConnPid1),
|
||||||
|
%% client 2 should receive the message
|
||||||
|
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec4 > MsgRec3),
|
||||||
|
emqtt:stop(ConnPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_dispatch_qos0({init, Config}) when is_list(Config) ->
|
||||||
|
Config;
|
||||||
|
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
|
||||||
|
ok;
|
||||||
|
t_dispatch_qos0(Config) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
%% subscribe with QoS 0
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
|
||||||
|
|
||||||
|
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
|
||||||
|
%% subscribe with QoS 0
|
||||||
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
||||||
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
||||||
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
||||||
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
ok = sys:suspend(ConnPid1),
|
||||||
|
|
||||||
|
?assertMatch([_], emqx:publish(Message1)),
|
||||||
|
?assertMatch([_], emqx:publish(Message2)),
|
||||||
|
?assertMatch([_], emqx:publish(Message3)),
|
||||||
|
?assertMatch([_], emqx:publish(Message4)),
|
||||||
|
|
||||||
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||||
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec2 > MsgRec1),
|
||||||
|
|
||||||
|
kill_process(ConnPid1),
|
||||||
|
%% expect no redispatch
|
||||||
|
?assertEqual([], collect_msgs(timer:seconds(2))),
|
||||||
|
emqtt:stop(ConnPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% help functions
|
%% help functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
kill_process(Pid) ->
|
||||||
|
_ = unlink(Pid),
|
||||||
|
_ = monitor(process, Pid),
|
||||||
|
erlang:exit(Pid, kill),
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Pid, _} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
collect_msgs(Timeout) ->
|
||||||
|
collect_msgs([], Timeout).
|
||||||
|
|
||||||
|
collect_msgs(Acc, Timeout) ->
|
||||||
|
receive
|
||||||
|
Msg ->
|
||||||
|
collect_msgs([Msg | Acc], Timeout)
|
||||||
|
after
|
||||||
|
Timeout ->
|
||||||
|
lists:reverse(Acc)
|
||||||
|
end.
|
||||||
|
|
||||||
ensure_config(Strategy) ->
|
ensure_config(Strategy) ->
|
||||||
ensure_config(Strategy, _AckEnabled = true).
|
ensure_config(Strategy, _AckEnabled = true).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue