Merge pull request #9097 from zmstone/1005-sync-v44-upstreams

1005 sync v44 upstreams
This commit is contained in:
Zaiming (Stone) Shi 2022-10-05 16:49:19 +02:00 committed by GitHub
commit 1270dbff24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 611 additions and 203 deletions

View File

@ -1,8 +1,5 @@
name: 'Create MacOS package' name: 'Create MacOS package'
inputs: inputs:
profile: # emqx, emqx-enterprise
required: true
type: string
otp: # 24.2.1-1, 23.3.4.9-3 otp: # 24.2.1-1, 23.3.4.9-3
required: true required: true
type: string type: string
@ -49,7 +46,7 @@ runs:
kerl update releases kerl update releases
kerl build ${{ inputs.otp }} kerl build ${{ inputs.otp }}
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }} kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
- name: build ${{ inputs.profile }} - name: build
env: env:
AUTO_INSTALL_BUILD_DEPS: 1 AUTO_INSTALL_BUILD_DEPS: 1
APPLE_SIGN_BINARIES: 1 APPLE_SIGN_BINARIES: 1
@ -64,14 +61,17 @@ runs:
. $HOME/.kerl/${{ inputs.otp }}/activate . $HOME/.kerl/${{ inputs.otp }}/activate
make ensure-rebar3 make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3 sudo cp rebar3 /usr/local/bin/rebar3
make ${{ inputs.profile }}-zip make ${EMQX_NAME}-zip
- name: test ${{ inputs.profile }} - name: test
shell: bash shell: bash
run: | run: |
pkg_name=$(basename _packages/${{ inputs.profile }}/${{ inputs.profile }}-*.zip) pkg_name=$(basename _packages/${EMQX_NAME}/${EMQX_NAME}-*.zip)
unzip -q _packages/${{ inputs.profile }}/$pkg_name unzip -q _packages/${EMQX_NAME}/$pkg_name
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins # test with a spaces in path
./emqx/bin/emqx start || cat emqx/log/erlang.log.1 mv ./emqx "./emqx home/"
cd "./emqx home/"
gsed -i '/emqx_telemetry/d' data/loaded_plugins
./bin/emqx start || cat log/erlang.log.1
ready='no' ready='no'
for i in {1..10}; do for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then if curl -fs 127.0.0.1:18083 > /dev/null; then
@ -82,14 +82,15 @@ runs:
done done
if [ "$ready" != "yes" ]; then if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready" echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1 cat log/erlang.log.1
exit 1 exit 1
fi fi
./emqx/bin/emqx_ctl status ./bin/emqx_ctl status
if ! ./emqx/bin/emqx stop; then if ! ./bin/emqx stop; then
cat emqx/log/erlang.log.1 || true cat log/erlang.log.1 || true
cat emqx/log/emqx.log.1 || true cat log/emqx.log.1 || true
echo "failed to stop emqx" echo "failed to stop emqx"
exit 1 exit 1
fi fi
rm -rf emqx cd ..
rm -rf "emqx home"

View File

@ -108,11 +108,8 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
otp: otp:
- 24.1.5-3 - 24.1.5-3
exclude:
- profile: emqx-edge
os: os:
- macos-11 - macos-11
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
@ -127,9 +124,12 @@ jobs:
ln -s . source ln -s . source
unzip -q source.zip unzip -q source.zip
rm source source.zip rm source source.zip
- id: detect-profiles
uses: ./.github/actions/detect-profiles
with:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/package-macos - uses: ./.github/actions/package-macos
with: with:
profile: ${{ matrix.profile }}
otp: ${{ matrix.otp }} otp: ${{ matrix.otp }}
os: ${{ matrix.os }} os: ${{ matrix.os }}
apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }} apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }}
@ -138,8 +138,8 @@ jobs:
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v1 - uses: actions/upload-artifact@v1
with: with:
name: ${{ matrix.profile }}-${{ matrix.otp }} name: ${EMQX_NAME}-${{ matrix.otp }}
path: _packages/${{ matrix.profile }}/. path: _packages/${EMQX_NAME}/.
linux: linux:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04

View File

@ -110,8 +110,6 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
profile:
- emqx
otp: otp:
- 24.1.5-3 - 24.1.5-3
os: os:
@ -124,35 +122,18 @@ jobs:
ci_git_token: ${{ secrets.CI_GIT_TOKEN }} ci_git_token: ${{ secrets.CI_GIT_TOKEN }}
- uses: ./.github/actions/package-macos - uses: ./.github/actions/package-macos
with: with:
profile: ${{ matrix.profile }}
otp: ${{ matrix.otp }} otp: ${{ matrix.otp }}
os: ${{ matrix.os }} os: ${{ matrix.os }}
apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }} apple_id_password: ${{ secrets.APPLE_ID_PASSWORD }}
apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }} apple_developer_identity: ${{ secrets.APPLE_DEVELOPER_IDENTITY }}
apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }} apple_developer_id_bundle: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE }}
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }} apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- name: test - uses: actions/upload-artifact@v1
run: | if: failure()
pkg_name=$(find _packages/${EMQX_NAME} -mindepth 1 -maxdepth 1 -iname \*.zip) with:
unzip -q $pkg_name name: rebar3.crashdump
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins path: ./rebar3.crashdump
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then
ready='yes'
break
fi
sleep 1
done
if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1
exit 1
fi
./emqx/bin/emqx_ctl status
./emqx/bin/emqx stop
rm -rf emqx
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v2
with: with:
name: macos name: macos

View File

@ -12,6 +12,8 @@ File format:
## v4.3.22 ## v4.3.22
### Minor changes
## v4.3.21 ## v4.3.21
### Enhancements ### Enhancements
@ -25,13 +27,27 @@ File format:
- TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007) - TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007)
Eliminate uncertainty that the buffer size is set by OS default. Eliminate uncertainty that the buffer size is set by OS default.
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955) - Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955)
- Added a test to prevent a last will testament message to be - Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
### Bug fixes
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
In this change, it also included more changes in redis client:
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19).
Also added support for eredis to accept an anonymous function as password instead of
passing around plaintext args which may get dumpped to crash logs (hard to predict where).
This change also added `format_status` callback for `gen_server` states which hold plaintext
password so the process termination log and `sys:get_status` will print '******' instead of
the password to console.
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22)
Same `format_status` callback is added here too for `gen_server`s which hold password in
their state.
## v4.3.20 ## v4.3.20
### Bug fixes ### Bug fixes

View File

@ -1,5 +1,11 @@
# EMQX 4.4 Changes # EMQX 4.4 Changes
## v4.4.10
### Bug fixes
- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981)
## v4.4.9 ## v4.4.9
### Bug fixes (synced from v4.3.20) ### Bug fixes (synced from v4.3.20)

View File

@ -20,6 +20,7 @@
-compile(export_all). -compile(export_all).
-include("emqx_auth_mnesia.hrl"). -include("emqx_auth_mnesia.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -78,15 +79,37 @@ init_per_testcase_migration(_, Config) ->
emqx_acl_mnesia_migrator:migrate_records(), emqx_acl_mnesia_migrator:migrate_records(),
Config. Config.
init_per_testcase_other(t_last_will_testament_message_check_acl, Config) ->
OriginalACLNoMatch = application:get_env(emqx, acl_nomatch),
application:set_env(emqx, acl_nomatch, deny),
emqx_mod_acl_internal:unload([]),
%% deny all for this client
ClientID = <<"lwt_client">>,
ok = emqx_acl_mnesia_db:add_acl({clientid, ClientID}, <<"#">>, pubsub, deny),
[ {original_acl_nomatch, OriginalACLNoMatch}
, {clientid, ClientID}
| Config];
init_per_testcase_other(_TestCase, Config) ->
Config.
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
PerTestInitializers = [ PerTestInitializers = [
fun init_per_testcase_clean/2, fun init_per_testcase_clean/2,
fun init_per_testcase_migration/2, fun init_per_testcase_migration/2,
fun init_per_testcase_emqx_hook/2 fun init_per_testcase_emqx_hook/2,
fun init_per_testcase_other/2
], ],
lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers). lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers).
end_per_testcase(_, Config) -> end_per_testcase(t_last_will_testament_message_check_acl, Config) ->
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
case ?config(original_acl_nomatch, Config) of
{ok, Original} -> application:set_env(emqx, acl_nomatch, Original);
_ -> ok
end,
emqx_mod_acl_internal:load([]),
ok;
end_per_testcase(_TestCase, Config) ->
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5), emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
Config. Config.
@ -465,6 +488,35 @@ t_rest_api(_Config) ->
{ok, Res3} = request_http_rest_list(["$all"]), {ok, Res3} = request_http_rest_list(["$all"]),
?assertMatch([], get_http_data(Res3)). ?assertMatch([], get_http_data(Res3)).
%% asserts that we check ACL for the LWT topic before publishing the
%% LWT.
t_last_will_testament_message_check_acl(Config) ->
ClientID = ?config(clientid, Config),
{ok, C} = emqtt:start_link([
{clientid, ClientID},
{will_topic, <<"$SYS/lwt">>},
{will_payload, <<"should not be published">>}
]),
{ok, _} = emqtt:connect(C),
ok = emqx:subscribe(<<"$SYS/lwt">>),
unlink(C),
ok = snabbkaffe:start_trace(),
{true, {ok, _}} =
?wait_async_action(
exit(C, kill),
#{?snk_kind := last_will_testament_publish_denied},
1_000
),
ok = snabbkaffe:stop(),
receive
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
error(lwt_should_not_be_published_to_forbidden_topic)
after 1_000 ->
ok
end,
ok.
create_conflicting_records() -> create_conflicting_records() ->
Records = [ Records = [

View File

@ -115,7 +115,7 @@ init_per_suite(Config) ->
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
deinit_mongo_data(), deinit_mongo_data(),
%% avoid inter-suite flakiness %% avoid inter-suite flakiness
ok = emqx_mod_acl_internal:load([]), emqx_mod_acl_internal:load([]),
emqx_ct_helpers:stop_apps([emqx_auth_mongo]). emqx_ct_helpers:stop_apps([emqx_auth_mongo]).
set_special_confs(emqx) -> set_special_confs(emqx) ->
@ -186,6 +186,8 @@ end_per_testcase(TestCase, Config)
when TestCase =:= t_available_acl_query_timeout; when TestCase =:= t_available_acl_query_timeout;
TestCase =:= t_acl_superuser_timeout; TestCase =:= t_acl_superuser_timeout;
TestCase =:= t_authn_no_connection; TestCase =:= t_authn_no_connection;
TestCase =:= t_available_authn_query_timeout;
TestCase =:= t_authn_timeout;
TestCase =:= t_available_acl_query_no_connection -> TestCase =:= t_available_acl_query_no_connection ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PSK_FILE).
-define(EMQX_PSK_FILE, true).
-define(PSK_FILE_TAB, emqx_psk_file).
-record(psk_entry, {psk_id :: binary(),
psk_str :: binary()}).
-endif.

View File

@ -1,6 +1,6 @@
{application, emqx_psk_file, {application, emqx_psk_file,
[{description,"EMQX PSK Plugin from File"}, [{description,"EMQX PSK Plugin from File"},
{vsn, "4.3.0"}, % strict semver, bump manually! {vsn, "4.3.1"}, % strict semver, bump manually!
{modules,[]}, {modules,[]},
{registered,[emqx_psk_file_sup]}, {registered,[emqx_psk_file_sup]},
{applications,[kernel,stdlib]}, {applications,[kernel,stdlib]},

View File

@ -0,0 +1,10 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.0",
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
],
[{"4.3.0",
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
]}.

View File

@ -16,6 +16,7 @@
-module(emqx_psk_file). -module(emqx_psk_file).
-include("emqx_psk_file.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -26,15 +27,10 @@
%% Hooks functions %% Hooks functions
-export([on_psk_lookup/2]). -export([on_psk_lookup/2]).
-define(TAB, ?MODULE).
-define(LF, 10). -define(LF, 10).
-record(psk_entry, {psk_id :: binary(),
psk_str :: binary()}).
%% Called when the plugin application start %% Called when the plugin application start
load(Env) -> load(Env) ->
_ = ets:new(?TAB, [set, named_table, {keypos, #psk_entry.psk_id}]),
{ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]), {ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]),
preload_psks(PskFile, bin(get_value(delimiter, Env))), preload_psks(PskFile, bin(get_value(delimiter, Env))),
_ = file:close(PskFile), _ = file:close(PskFile),
@ -45,7 +41,7 @@ unload() ->
emqx:unhook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2). emqx:unhook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2).
on_psk_lookup(ClientPSKID, UserState) -> on_psk_lookup(ClientPSKID, UserState) ->
case ets:lookup(?TAB, ClientPSKID) of case ets:lookup(?PSK_FILE_TAB, ClientPSKID) of
[#psk_entry{psk_str = PskStr}] -> [#psk_entry{psk_str = PskStr}] ->
{stop, PskStr}; {stop, PskStr};
[] -> [] ->
@ -57,7 +53,9 @@ preload_psks(FileHandler, Delimiter) ->
{ok, Line} -> {ok, Line} ->
case binary:split(Line, Delimiter) of case binary:split(Line, Delimiter) of
[Key, Rem] -> [Key, Rem] ->
ets:insert(?TAB, #psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}), ets:insert(
?PSK_FILE_TAB,
#psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}),
preload_psks(FileHandler, Delimiter); preload_psks(FileHandler, Delimiter);
[Line] -> [Line] ->
?LOG(warning, "[~p] - Invalid line: ~p, delimiter: ~p", [?MODULE, Line, Delimiter]) ?LOG(warning, "[~p] - Invalid line: ~p, delimiter: ~p", [?MODULE, Line, Delimiter])

View File

@ -16,6 +16,8 @@
-module(emqx_psk_file_sup). -module(emqx_psk_file_sup).
-include("emqx_psk_file.hrl").
-behaviour(supervisor). -behaviour(supervisor).
%% API %% API
@ -25,8 +27,11 @@
-export([init/1]). -export([init/1]).
start_link() -> start_link() ->
_ = ets:new(
?PSK_FILE_TAB,
[set, named_table, public, {keypos, #psk_entry.psk_id}]
),
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
{ok, { {one_for_one, 0, 1}, []} }. {ok, { {one_for_one, 0, 1}, []} }.

View File

@ -1,6 +1,6 @@
{application, emqx_retainer, {application, emqx_retainer,
[{description, "EMQ X Retainer"}, [{description, "EMQ X Retainer"},
{vsn, "4.4.2"}, % strict semver, bump manually! {vsn, "4.4.3"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]}, {applications, [kernel,stdlib]},

View File

@ -34,6 +34,23 @@ init([Env]) ->
type => worker, type => worker,
modules => [emqx_retainer]} || not is_managed_by_modules()]}}. modules => [emqx_retainer]} || not is_managed_by_modules()]}}.
-ifdef(EMQX_ENTERPRISE).
is_managed_by_modules() ->
try
case supervisor:get_childspec(emqx_modules_sup, emqx_retainer) of
{ok, _} -> true;
_ -> false
end
catch
exit : {noproc, _} ->
false
end.
-else.
is_managed_by_modules() -> is_managed_by_modules() ->
%% always false for opensource edition %% always false for opensource edition
false. false.
-endif.

View File

@ -20,7 +20,9 @@
-export([ensure_start/0, ensure_stop/0]). -export([ensure_start/0, ensure_stop/0]).
-ifdef(EMQX_ENTERPRISE). -ifdef(EMQX_ENTERPRISE).
ensure_start() -> ensure_start() ->
%% for enterprise edition, retainer is started by modules
application:stop(emqx_modules), application:stop(emqx_modules),
ensure_stop(),
init_conf(), init_conf(),
emqx_ct_helpers:start_apps([emqx_retainer]), emqx_ct_helpers:start_apps([emqx_retainer]),
ok. ok.
@ -29,6 +31,7 @@ ensure_start() ->
ensure_start() -> ensure_start() ->
init_conf(), init_conf(),
ensure_stop(),
emqx_ct_helpers:start_apps([emqx_retainer]), emqx_ct_helpers:start_apps([emqx_retainer]),
ok. ok.

View File

@ -523,6 +523,16 @@ case "$1" in
;; ;;
esac esac
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS_FILE" ]; then
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"
echoerr "Please make sure the node is initialized (started for at least once)"
exit 1
fi
fi
if [ -z "$NAME_ARG" ]; then if [ -z "$NAME_ARG" ]; then
NODENAME="${EMQX_NODE_NAME:-}" NODENAME="${EMQX_NODE_NAME:-}"
@ -530,15 +540,7 @@ if [ -z "$NAME_ARG" ]; then
[ -z "$NODENAME" ] && [ -n "$EMQX_NAME" ] && [ -n "$EMQX_HOST" ] && NODENAME="${EMQX_NAME}@${EMQX_HOST}" [ -z "$NODENAME" ] && [ -n "$EMQX_NAME" ] && [ -n "$EMQX_HOST" ] && NODENAME="${EMQX_NAME}@${EMQX_HOST}"
if [ -z "$NODENAME" ]; then if [ -z "$NODENAME" ]; then
if [ "$IS_BOOT_COMMAND" = 'no' ]; then if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echoerr "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
echoerr "Please make sure the node is initialized (started for at least once)"
exit 1
fi
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')"
else else
# for boot commands, inspect emqx.conf for node name # for boot commands, inspect emqx.conf for node name
NODENAME=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.name) NODENAME=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.name)
@ -573,13 +575,7 @@ if [ -z "$COOKIE" ]; then
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
COOKIE=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie) COOKIE=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie)
else else
# shellcheck disable=SC2012,SC2086 COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echo "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
exit 1
fi
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS" | awk '{print $2}')"
fi fi
fi fi

View File

@ -22,13 +22,25 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include("include/emqx_mqtt.hrl"). -include("include/emqx_mqtt.hrl").
-include_lib("include/emqx.hrl"). -include_lib("include/emqx.hrl").
-define(LANTENCY, 101).
%-define(LOGT(Format, Args), ct:pal(Format, Args)). %-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(TOPK_TAB, emqx_slow_subs_topk). -define(TOPK_TAB, emqx_slow_subs_topk).
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE). all() ->
[ {group, whole}
, {group, internal}
, {group, response}
].
groups() ->
Cases = emqx_ct:all(?MODULE),
[ {whole, [], Cases}
, {internal, [], Cases}
, {response, [], Cases}
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]), emqx_ct_helpers:start_apps([emqx]),
@ -39,7 +51,8 @@ end_per_suite(Config) ->
Config. Config.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
emqx_mod_slow_subs:load(base_conf()), Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
emqx_mod_slow_subs:load(base_conf(Group)),
Config. Config.
end_per_testcase(_, _) -> end_per_testcase(_, _) ->
@ -49,10 +62,10 @@ end_per_testcase(_, _) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_log_and_pub(_) -> t_log_and_pub(Config) ->
%% Sub topic first %% Sub topic first
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs), Clients = start_client(Subs, Config),
timer:sleep(1500), timer:sleep(1500),
Now = ?NOW, Now = ?NOW,
@ -60,14 +73,14 @@ t_log_and_pub(_) ->
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test1/~p", [I])), Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500}) emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end, end,
lists:seq(1, 10)), lists:seq(1, 10)),
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])), Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500}) emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end, end,
lists:seq(1, 10)), lists:seq(1, 10)),
@ -77,25 +90,33 @@ t_log_and_pub(_) ->
?assert(Size =< 8 andalso Size >= 3, ?assert(Size =< 8 andalso Size >= 3,
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))), unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
?assert(
lists:all(
fun(#{timespan := Ts}) ->
Ts >= 101 andalso Ts < ?NOW - Now
end,
emqx_slow_subs_api:get_history()
)
),
timer:sleep(3000), timer:sleep(3000),
?assert(ets:info(?TOPK_TAB, size) =:= 0), ?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients], [Client ! stop || Client <- Clients],
ok. ok.
base_conf() -> base_conf(Type) ->
[ {threshold, 300} [ {threshold, 100}
, {top_k_num, 5} , {top_k_num, 5}
, {expire_interval, timer:seconds(3)} , {expire_interval, timer:seconds(3)}
, {stats_type, whole} , {stats_type, Type}
]. ].
start_client(Subs) -> start_client(Subs, Config) ->
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. [spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)].
client(I, Subs) -> client(I, Subs, Config) ->
{ok, C} = emqtt:start_link([{host, "localhost"}, Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
{clientid, io_lib:format("slow_subs_~p", [I])}, ConnOptions = make_conn_options(Group, I),
{username, <<"plain">>}, {ok, C} = emqtt:start_link(ConnOptions),
{password, <<"plain">>}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
Len = erlang:length(Subs), Len = erlang:length(Subs),
@ -115,3 +136,14 @@ try_receive(Acc) ->
after 500 -> after 500 ->
Acc Acc
end. end.
make_conn_options(response, I) ->
[ {msg_handler,
#{publish => fun(_) -> timer:sleep(50) end,
disconnected => fun(_) -> ok end}}
| make_conn_options(whole, I)];
make_conn_options(_, I) ->
[{host, "localhost"},
{clientid, io_lib:format("slow_subs_~p", [I])},
{username, <<"plain">>},
{password, <<"plain">>}].

View File

@ -42,7 +42,7 @@
, {redbug, "2.0.7"} , {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}

View File

@ -170,27 +170,29 @@ remote_refs() {
upstream_branches() { upstream_branches() {
local base="$1" local base="$1"
case "$base" in case "$base" in
release-v43|main-v4.3) release-v43)
## no upstream for 4.3 opensource
remote_ref "$base" remote_ref "$base"
;; ;;
release-v44) release-v44)
remote_refs "$base" 'release-v43' remote_refs "$base" 'release-v43'
;; ;;
main-v4.4)
remote_refs "$base" 'main-v4.3'
;;
release-e43) release-e43)
remote_refs "$base" 'release-v43' remote_refs "$base" 'release-v43'
;; ;;
main-v4.3-enterprise)
remote_refs "$base" 'main-v4.3'
;;
release-e44) release-e44)
remote_refs "$base" 'release-v44' 'release-e43' 'release-v43' remote_refs "$base" 'release-v44' 'release-e43' 'release-v43'
;; ;;
main-v4.3)
remote_refs "$base" 'release-v43'
;;
main-v4.4)
remote_refs "$base" 'release-v44' 'main-v4.3'
;;
main-v4.3-enterprise)
remote_refs "$base" 'release-e43' 'main-v4.3'
;;
main-v4.4-enterprise) main-v4.4-enterprise)
remote_refs "$base" 'main-v4.4' 'main-v4.3-enterprise' 'main-v4.3' remote_refs "$base" 'release-e44' 'main-v4.4' 'main-v4.3-enterprise' 'main-v4.3'
;; ;;
esac esac
} }

View File

@ -8,12 +8,20 @@
set -euo pipefail set -euo pipefail
set -x set -x
usage() {
echo "$0 PROFILE"
}
# ensure dir # ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
usage() {
echo "$0 PROFILE [options]"
echo "options:"
echo "--skip-build: Skip building the profile only to re-generate the appup files."
echo "--skip-build-base: This script by default forces a git clean before rebuilding on the base version "
echo " this option is useful when you are sure the past builds can be trusted,"
echo " that is, there were no re-tags or anything."
echo "--check: Exit with non-zero code if there is git diff after the execution."
echo " Mostly used in CI."
}
PROFILE="${1:-}" PROFILE="${1:-}"
case "$PROFILE" in case "$PROFILE" in
emqx-ee) emqx-ee)
@ -49,7 +57,7 @@ ESCRIPT_ARGS=( '' )
while [ "$#" -gt 0 ]; do while [ "$#" -gt 0 ]; do
case $1 in case $1 in
-h|--help) -h|--help)
help usage
exit 0 exit 0
;; ;;
--skip-build) --skip-build)
@ -101,7 +109,7 @@ else
pushd "${PREV_DIR_BASE}/${PREV_TAG}" pushd "${PREV_DIR_BASE}/${PREV_TAG}"
if [ "$NEW_COPY" = 'no' ]; then if [ "$NEW_COPY" = 'no' ]; then
REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')" REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')"
git fetch "$REMOTE" git fetch "$REMOTE" --tags -f
fi fi
git reset --hard git reset --hard
git clean -ffdx git clean -ffdx

View File

@ -62,7 +62,8 @@ app_specific_actions(_) ->
ignored_apps() -> ignored_apps() ->
[gpb, %% only a build tool [gpb, %% only a build tool
emqx_dashboard, %% generic appup file for all versions emqx_dashboard, %% generic appup file for all versions
emqx_management %% generic appup file for all versions emqx_management, %% generic appup file for all versions
emqx_modules_spec %% generic appup file for all versions
] ++ otp_standard_apps(). ] ++ otp_standard_apps().
main(Args) -> main(Args) ->
@ -284,9 +285,9 @@ merge_update_actions(App, Changes, Vsns, PrevVersion) ->
%% but there is a 1.1.2 in appup we may skip merging instructions for %% but there is a 1.1.2 in appup we may skip merging instructions for
%% 1.1.2 because it's not used and no way to know what has been changed %% 1.1.2 because it's not used and no way to know what has been changed
is_skipped_version(App, Vsn, PrevVersion) when is_list(Vsn) andalso is_list(PrevVersion) -> is_skipped_version(App, Vsn, PrevVersion) when is_list(Vsn) andalso is_list(PrevVersion) ->
case is_app_external(App) andalso parse_version_number(Vsn) of case is_app_external(App) andalso parse_version(Vsn, non_strict_semver) of
{ok, VsnTuple} -> {ok, VsnTuple} ->
case parse_version_number(PrevVersion) of case parse_version(PrevVersion, non_strict_semver) of
{ok, PrevVsnTuple} -> {ok, PrevVsnTuple} ->
VsnTuple > PrevVsnTuple; VsnTuple > PrevVsnTuple;
_ -> _ ->
@ -312,7 +313,7 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
true -> true ->
[]; [];
false -> false ->
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed] ++ [{load_module, M, brutal_purge, soft_purge, []} || M <- Changed, not is_secret_module(M)] ++
[{add_module, M} || M <- New] [{add_module, M} || M <- New]
end, end,
{OldActionsWithStop, OldActionsAfterStop} = {OldActionsWithStop, OldActionsAfterStop} =
@ -324,10 +325,18 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
true -> true ->
[]; [];
false -> false ->
[{delete_module, M} || M <- Deleted] [{delete_module, M} || M <- Deleted, not is_secret_module(M)]
end ++ end ++
AppSpecific. AppSpecific.
%% Do not reload or delet _secret modules
is_secret_module(Module) ->
Suffix = "_secret",
case string:right(atom_to_list(Module), length(Suffix)) of
Suffix -> true;
_ -> false
end.
%% If an entry restarts an application, there's no need to use %% If an entry restarts an application, there's no need to use
%% `load_module' instructions. %% `load_module' instructions.
contains_restart_application(Application, Actions) -> contains_restart_application(Application, Actions) ->
@ -397,7 +406,7 @@ contains_version(Needle, Haystack) when is_list(Needle) ->
%% past versions that should be covered by regexes in .appup file %% past versions that should be covered by regexes in .appup file
%% instructions. %% instructions.
enumerate_past_versions(Vsn) when is_list(Vsn) -> enumerate_past_versions(Vsn) when is_list(Vsn) ->
case parse_version_number(Vsn) of case parse_version(Vsn) of
{ok, ParsedVsn} -> {ok, ParsedVsn} ->
{ok, enumerate_past_versions(ParsedVsn)}; {ok, enumerate_past_versions(ParsedVsn)};
Error -> Error ->
@ -406,14 +415,39 @@ enumerate_past_versions(Vsn) when is_list(Vsn) ->
enumerate_past_versions({Major, Minor, Patch}) -> enumerate_past_versions({Major, Minor, Patch}) ->
[{Major, Minor, P} || P <- lists:seq(Patch - 1, 0, -1)]. [{Major, Minor, P} || P <- lists:seq(Patch - 1, 0, -1)].
parse_version_number(Vsn) when is_list(Vsn) -> parse_version(Vsn) ->
Nums = string:split(Vsn, ".", all), parse_version(Vsn, strict_semver).
Results = lists:map(fun string:to_integer/1, Nums),
case Results of parse_version(Vsn, MaybeSemver) when is_list(Vsn) ->
[{Major, []}, {Minor, []}, {Patch, []}] -> case parse_dot_separated_numbers(Vsn) of
{ok, {Major, Minor, Patch}}; {ok, {_Major, _Minor, _Patch}} = Res ->
Res;
{ok, Nums} ->
case MaybeSemver of
strict_semver ->
{error, {bad_semver, Vsn}};
non_strict_semver ->
{ok, Nums}
end;
{error, Reason} ->
{error, {Reason, Vsn}}
end.
parse_dot_separated_numbers(Str) when is_list(Str) ->
try
Split = string:split(Str, ".", all),
IntL = lists:map(fun(SubStr) ->
case string:to_integer(SubStr) of
{Int, []} when is_integer(Int) ->
Int;
_ -> _ ->
{error, bad_version} throw(no_integer)
end
end, Split),
{ok, list_to_tuple(IntL)}
catch
_ : _ ->
{error, bad_version_string}
end. end.
vsn_number_to_string({Major, Minor, Patch}) -> vsn_number_to_string({Major, Minor, Patch}) ->

View File

@ -3,6 +3,9 @@
{VSN, {VSN,
[{"4.4.9", [{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -15,6 +18,9 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
@ -28,7 +34,9 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}]},
{"4.4.7", {"4.4.7",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -43,7 +51,9 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6", {"4.4.6",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -58,7 +68,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]}, {load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.4.5", {"4.4.5",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -76,7 +87,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.4", {"4.4.4",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -102,7 +114,8 @@
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.3", {"4.4.3",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -135,7 +148,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]}, {load_module,emqx_relup}]},
{"4.4.2", {"4.4.2",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -170,7 +184,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]}, {load_module,emqx_relup}]},
{"4.4.1", {"4.4.1",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
@ -211,7 +226,8 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup}]}, {add_module,emqx_relup}]},
{"4.4.0", {"4.4.0",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor}, {update,emqx_broker_sup,supervisor},
@ -256,6 +272,8 @@
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.9", [{"4.4.9",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -268,6 +286,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_misc,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
@ -281,7 +301,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}]},
{"4.4.7", {"4.4.7",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -296,7 +317,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6", {"4.4.6",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, [{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},

View File

@ -228,6 +228,7 @@ shutdown() ->
shutdown(Reason) -> shutdown(Reason) ->
?LOG(critical, "emqx shutdown for ~s", [Reason]), ?LOG(critical, "emqx shutdown for ~s", [Reason]),
on_shutdown(Reason),
_ = emqx_plugins:unload(), _ = emqx_plugins:unload(),
lists:foreach(fun application:stop/1 lists:foreach(fun application:stop/1
, lists:reverse(default_started_applications()) , lists:reverse(default_started_applications())
@ -238,10 +239,12 @@ reboot() ->
true -> true ->
_ = application:stop(emqx_dashboard), %% dashboard must be started after mnesia _ = application:stop(emqx_dashboard), %% dashboard must be started after mnesia
lists:foreach(fun application:start/1 , default_started_applications()), lists:foreach(fun application:start/1 , default_started_applications()),
application:start(emqx_dashboard); _ = application:start(emqx_dashboard),
on_reboot();
false -> false ->
lists:foreach(fun application:start/1 , default_started_applications()) lists:foreach(fun application:start/1 , default_started_applications()),
on_reboot()
end. end.
is_application_running(App) -> is_application_running(App) ->
@ -256,6 +259,32 @@ default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules]. [gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
-endif. -endif.
-ifdef(EMQX_ENTERPRISE).
on_reboot() ->
try
_ = emqx_license_api:bootstrap_license(),
ok
catch
Kind:Reason:Stack ->
?LOG(critical, "~p while rebooting: ~p, ~p", [Kind, Reason, Stack]),
ok
end,
ok.
on_shutdown(join) ->
emqx_modules:sync_load_modules_file(),
ok;
on_shutdown(_) ->
ok.
-else.
on_reboot() ->
ok.
on_shutdown(_) ->
ok.
-endif.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -976,9 +976,10 @@ return_sub_unsub_ack(Packet, Channel) ->
handle_call(kick, Channel = #channel{ handle_call(kick, Channel = #channel{
conn_state = ConnState, conn_state = ConnState,
will_msg = WillMsg, will_msg = WillMsg,
clientinfo = ClientInfo,
conninfo = #{proto_ver := ProtoVer} conninfo = #{proto_ver := ProtoVer}
}) -> }) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg), (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
Channel1 = case ConnState of Channel1 = case ConnState of
connected -> ensure_disconnected(kicked, Channel); connected -> ensure_disconnected(kicked, Channel);
_ -> Channel _ -> Channel
@ -1140,8 +1141,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
handle_timeout(_TRef, expire_session, Channel) -> handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel); shutdown(expired, Channel);
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg,
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg), clientinfo = ClientInfo}) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
handle_timeout(_TRef, expire_quota_limit, Channel) -> handle_timeout(_TRef, expire_quota_limit, Channel) ->
@ -1197,9 +1199,10 @@ terminate(_Reason, #channel{conn_state = idle} = _Channel) ->
ok; ok;
terminate(normal, Channel) -> terminate(normal, Channel) ->
run_terminate_hook(normal, Channel); run_terminate_hook(normal, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> terminate(Reason, Channel = #channel{will_msg = WillMsg,
clientinfo = ClientInfo}) ->
should_publish_will_message(Reason, Channel) should_publish_will_message(Reason, Channel)
andalso publish_will_msg(WillMsg), andalso publish_will_msg(ClientInfo, WillMsg),
run_terminate_hook(Reason, Channel). run_terminate_hook(Reason, Channel).
run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) -> run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
@ -1741,10 +1744,11 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel; Channel;
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg,
clientinfo = ClientInfo}) ->
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
ok = publish_will_msg(WillMsg), ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
I -> I ->
ensure_timer(will_timer, timer:seconds(I), Channel) ensure_timer(will_timer, timer:seconds(I), Channel)
@ -1754,9 +1758,19 @@ will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval', maps:get('Will-Delay-Interval',
emqx_message:get_header(properties, WillMsg, #{}), 0). emqx_message:get_header(properties, WillMsg, #{}), 0).
publish_will_msg(Msg) -> publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
case emqx_access_control:check_acl(ClientInfo, publish, Topic) of
allow ->
_ = emqx_broker:publish(Msg), _ = emqx_broker:publish(Msg),
ok. ok;
deny ->
?tp(
warning,
last_will_testament_publish_denied,
#{topic => Topic}
),
ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Disconnect Reason %% Disconnect Reason

View File

@ -96,8 +96,8 @@ do_parse(URI) ->
%% underscores replaced with hyphens %% underscores replaced with hyphens
%% NOTE: assuming the input Headers list is a proplists, %% NOTE: assuming the input Headers list is a proplists,
%% that is, when a key is duplicated, list header overrides tail %% that is, when a key is duplicated, list header overrides tail
%% e.g. [{"Content_Type", "applicaiton/binary"}, {<<"content-type">>, "applicaiton/json"}] %% e.g. [{"Content_Type", "applicaiton/binary"}, {"content-type", "applicaiton/json"}]
%% results in: [{"content-type", "applicaiton/binary"}] %% results in: [{<<"content-type">>, "applicaiton/binary"}]
normalise_headers(Headers0) -> normalise_headers(Headers0) ->
F = fun({K0, V}) -> F = fun({K0, V}) ->
K = re:replace(K0, "_", "-", [{return,binary}]), K = re:replace(K0, "_", "-", [{return,binary}]),

View File

@ -47,6 +47,7 @@
, index_of/2 , index_of/2
, maybe_parse_ip/1 , maybe_parse_ip/1
, ipv6_probe/1 , ipv6_probe/1
, ipv6_probe/2
, pmap/2 , pmap/2
, pmap/3 , pmap/3
]). ]).
@ -94,12 +95,15 @@ maybe_parse_ip(Host) ->
%% @doc Add `ipv6_probe' socket option if it's supported. %% @doc Add `ipv6_probe' socket option if it's supported.
ipv6_probe(Opts) -> ipv6_probe(Opts) ->
ipv6_probe(Opts, true).
ipv6_probe(Opts, Ipv6Probe) when is_boolean(Ipv6Probe) orelse is_integer(Ipv6Probe) ->
Bool = try gen_tcp:ipv6_probe() Bool = try gen_tcp:ipv6_probe()
catch _ : _ -> false end, catch _ : _ -> false end,
ipv6_probe(Bool, Opts). ipv6_probe(Bool, Opts, Ipv6Probe).
ipv6_probe(false, Opts) -> Opts; ipv6_probe(false, Opts, _) -> Opts;
ipv6_probe(true, Opts) -> [{ipv6_probe, true} | Opts]. ipv6_probe(true, Opts, Ipv6Probe) -> [{ipv6_probe, Ipv6Probe} | Opts].
%% @doc Merge options %% @doc Merge options
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()). -spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).

View File

@ -24,6 +24,7 @@
-export([init/0]). -export([init/0]).
-export([ load/0 -export([ load/0
, force_load/0
, load/1 , load/1
, unload/0 , unload/0
, unload/1 , unload/1
@ -59,12 +60,17 @@ init() ->
%% @doc Load all plugins when the broker started. %% @doc Load all plugins when the broker started.
-spec(load() -> ok | ignore | {error, term()}). -spec(load() -> ok | ignore | {error, term()}).
load() -> load() ->
do_load(#{force_load => false}).
force_load() ->
do_load(#{force_load => true}).
do_load(Options) ->
ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)), ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
case emqx:get_env(plugins_loaded_file) of case emqx:get_env(plugins_loaded_file) of
undefined -> ignore; %% No plugins available undefined -> ignore; %% No plugins available
File -> File ->
_ = ensure_file(File), _ = ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) with_loaded_file(File, fun(Names) -> load_plugins(Names, Options, false) end)
end. end.
%% @doc Load a Plugin %% @doc Load a Plugin
@ -282,18 +288,23 @@ filter_plugins([{Name, Load} | Names], Plugins) ->
filter_plugins([Name | Names], Plugins) when is_atom(Name) -> filter_plugins([Name | Names], Plugins) when is_atom(Name) ->
filter_plugins([{Name, true} | Names], Plugins). filter_plugins([{Name, true} | Names], Plugins).
load_plugins(Names, Persistent) -> load_plugins(Names, Options, Persistent) ->
Plugins = list(), Plugins = list(),
NotFound = Names -- names(Plugins), NotFound = Names -- names(Plugins),
case NotFound of case NotFound of
[] -> ok; [] -> ok;
NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound]) NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
end, end,
NeedToLoad = (Names -- NotFound) -- names(started_app), NeedToLoad0 = Names -- NotFound,
NeedToLoad1 =
case Options of
#{force_load := true} -> NeedToLoad0;
_ -> NeedToLoad0 -- names(started_app)
end,
lists:foreach(fun(Name) -> lists:foreach(fun(Name) ->
Plugin = find_plugin(Name, Plugins), Plugin = find_plugin(Name, Plugins),
load_plugin(Plugin#plugin.name, Persistent) load_plugin(Plugin#plugin.name, Persistent)
end, NeedToLoad). end, NeedToLoad1).
generate_configs(App) -> generate_configs(App) ->
ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config", ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",

41
src/emqx_secret.erl Normal file
View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Note: this module CAN'T be hot-patched to avoid invalidating the
%% closures, so it must not be changed.
-module(emqx_secret).
%% API:
-export([wrap/1, unwrap/1]).
%%================================================================================
%% API funcions
%%================================================================================
wrap(undefined) ->
undefined;
wrap(Func) when is_function(Func) ->
Func;
wrap(Term) ->
fun() ->
Term
end.
unwrap(Term) when is_function(Term, 0) ->
%% Handle potentially nested funs
unwrap(Term());
unwrap(Term) ->
Term.

View File

@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) ->
ok. ok.
mark_begin_deliver(Msg) -> mark_begin_deliver(Msg) ->
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -18,33 +18,57 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(SLAVE_START_APPS, [emqx]). %% modules is included because code is called before cluster join
-define(SLAVE_START_APPS, [emqx, emqx_modules]).
-export([start_slave/1, -export([start_slave/1,
start_slave/2, start_slave/2,
stop_slave/1]). stop_slave/1,
wait_for_synced_routes/3
]).
start_slave(Name) -> start_slave(Name) ->
start_slave(Name, #{}). start_slave(Name, #{}).
start_slave(Name, Opts) -> start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()), Node = make_node_name(Name),
[{kill_if_fail, true}, case ct_slave:start(Node, [{kill_if_fail, true},
{monitor_master, true}, {monitor_master, true},
{init_timeout, 10000}, {init_timeout, 10000},
{startup_timeout, 10000}, {startup_timeout, 10000},
{erl_flags, ebin_path()}]), {erl_flags, ebin_path()}]) of
{ok, _} ->
ok;
{error, started_not_connected, _} ->
ok
end,
pong = net_adm:ping(Node), pong = net_adm:ping(Node),
setup_node(Node, Opts), setup_node(Node, Opts),
Node. Node.
stop_slave(Node) -> make_node_name(Name) ->
rpc:call(Node, ekka, leave, []), case string:tokens(atom_to_list(Name), "@") of
ct_slave:stop(Node). [_Name, _Host] ->
%% the name already has a @
Name;
_ ->
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
end.
stop_slave(Node0) ->
Node = make_node_name(Node0),
case rpc:call(Node, ekka, leave, []) of
ok -> ok;
{badrpc, nodedown} -> ok
end,
case ct_slave:stop(Node) of
{ok, _} -> ok;
{error, not_started, _} -> ok
end.
host() -> host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host. [_, Host] = string:tokens(atom_to_list(node()), "@"),
Host.
ebin_path() -> ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
@ -71,7 +95,12 @@ setup_node(Node, #{} = Opts) ->
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]), ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
rpc:call(Node, ekka, join, [node()]), case maps:get(no_join, Opts, false) of
true ->
ok;
false ->
ok = rpc:call(Node, ekka, join, [node()])
end,
%% Sanity check. Assert that `gen_rpc' is set up correctly: %% Sanity check. Assert that `gen_rpc' is set up correctly:
?assertEqual( Node ?assertEqual( Node
@ -81,3 +110,40 @@ setup_node(Node, #{} = Opts) ->
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
), ),
ok. ok.
%% Routes are replicated async.
%% Call this function to wait for nodes in the cluster to have the same view
%% for a given topic.
wait_for_synced_routes(Nodes, Topic, Timeout) ->
F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end,
emqx_misc:nolink_apply(F, Timeout).
do_wait_for_synced_routes(Nodes, Topic) ->
PerNodeView0 =
lists:map(
fun(Node) ->
{rpc:call(Node, emqx_router, match_routes, [Topic]), Node}
end, Nodes),
PerNodeView = lists:keysort(1, PerNodeView0),
case check_consistent_view(PerNodeView) of
{ok, OneView} ->
ct:pal("consistent_routes_view~n~p", [OneView]),
ok;
{error, Reason}->
ct:pal("inconsistent_routes_view~n~p", [Reason]),
timer:sleep(10),
do_wait_for_synced_routes(Nodes, Topic)
end.
check_consistent_view(PerNodeView) ->
check_consistent_view(PerNodeView, []).
check_consistent_view([], [OneView]) -> {ok, OneView};
check_consistent_view([], MoreThanOneView) -> {error, MoreThanOneView};
check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) ->
check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]);
check_consistent_view([{View, Node} | Rest], Acc) ->
check_consistent_view(Rest, [{View, Node} | Acc]).
add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes];
add_to_list(Node, Node1) -> [Node, Node1].

View File

@ -40,6 +40,9 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery), PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless), application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc), application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded
%% so the mnesia tables are created
ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
[{port_discovery, PortDiscovery} | Config]. [{port_discovery, PortDiscovery} | Config].
@ -50,32 +53,45 @@ end_per_suite(Config) ->
_ -> ok _ -> ok
end. end.
t_is_ack_required(_) -> init_per_testcase(Case, Config) ->
try
?MODULE:Case({'init', Config})
catch
error : function_clause ->
Config
end.
end_per_testcase(Case, Config) ->
try
?MODULE:Case({'end', Config})
catch
error : function_clause ->
ok
end.
t_is_ack_required(Config) when is_list(Config) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) -> t_maybe_nack_dropped(Config) when is_list(Config) ->
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) -> t_nack_no_connection(Config) when is_list(Config) ->
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end). after 100 -> timeout end).
t_maybe_ack(_) -> t_maybe_ack(Config) when is_list(Config) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)), emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
% t_subscribers(_) -> t_random_basic(Config) when is_list(Config) ->
% error('TODO').
t_random_basic(_) ->
ok = ensure_config(random), ok = ensure_config(random),
ClientId = <<"ClientId">>, ClientId = <<"ClientId">>,
Topic = <<"foo">>, Topic = <<"foo">>,
@ -105,7 +121,7 @@ t_random_basic(_) ->
%% After the connection for the 2nd session is also closed, %% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s) %% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly. %% should be delivered randomly.
t_no_connection_nack(_) -> t_no_connection_nack(Config) when is_list(Config) ->
ok = ensure_config(sticky), ok = ensure_config(sticky),
Publisher = <<"publisher">>, Publisher = <<"publisher">>,
Subscriber1 = <<"Subscriber1">>, Subscriber1 = <<"Subscriber1">>,
@ -171,27 +187,27 @@ t_no_connection_nack(_) ->
% emqx_sm:close_session(SPid2), % emqx_sm:close_session(SPid2),
ok. ok.
t_random(_) -> t_random(Config) when is_list(Config) ->
ok = ensure_config(random, true), ok = ensure_config(random, true),
test_two_messages(random). test_two_messages(random).
t_round_robin(_) -> t_round_robin(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true), ok = ensure_config(round_robin, true),
test_two_messages(round_robin). test_two_messages(round_robin).
t_sticky(_) -> t_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky, true), ok = ensure_config(sticky, true),
test_two_messages(sticky). test_two_messages(sticky).
t_hash(_) -> t_hash(Config) when is_list(Config) ->
ok = ensure_config(hash, false), ok = ensure_config(hash, false),
test_two_messages(hash). test_two_messages(hash).
t_hash_clinetid(_) -> t_hash_clinetid(Config) when is_list(Config) ->
ok = ensure_config(hash_clientid, false), ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid). test_two_messages(hash_clientid).
t_hash_topic(_) -> t_hash_topic(Config) when is_list(Config) ->
ok = ensure_config(hash_topic, false), ok = ensure_config(hash_topic, false),
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
@ -230,7 +246,7 @@ t_hash_topic(_) ->
ok. ok.
%% if the original subscriber dies, change to another one alive %% if the original subscriber dies, change to another one alive
t_not_so_sticky(_) -> t_not_so_sticky(Config) when is_list(Config) ->
ok = ensure_config(sticky), ok = ensure_config(sticky),
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
<<"not yet?">> <<"not yet?">>
end. end.
t_dispatch(_) -> t_dispatch(Config) when is_list(Config) ->
ok = ensure_config(random), ok = ensure_config(random),
Topic = <<"foo">>, Topic = <<"foo">>,
?assertEqual({error, no_subscribers}, ?assertEqual({error, no_subscribers},
@ -312,18 +328,13 @@ t_dispatch(_) ->
?assertEqual({ok, 1}, ?assertEqual({ok, 1},
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})). emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
% t_unsubscribe(_) -> t_uncovered_func(Config) when is_list(Config) ->
% error('TODO').
% t_subscribe(_) ->
% error('TODO').
t_uncovered_func(_) ->
ignored = gen_server:call(emqx_shared_sub, ignored), ignored = gen_server:call(emqx_shared_sub, ignored),
ok = gen_server:cast(emqx_shared_sub, ignored), ok = gen_server:cast(emqx_shared_sub, ignored),
ignored = emqx_shared_sub ! ignored, ignored = emqx_shared_sub ! ignored,
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
t_per_group_config(_) -> t_per_group_config(Config) when is_list(Config) ->
ok = ensure_group_config(#{ ok = ensure_group_config(#{
<<"local_group_fallback">> => local, <<"local_group_fallback">> => local,
<<"local_group">> => local, <<"local_group">> => local,
@ -342,8 +353,8 @@ t_per_group_config(_) ->
test_two_messages(round_robin, <<"round_robin_group">>), test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(round_robin, <<"round_robin_group">>). test_two_messages(round_robin, <<"round_robin_group">>).
t_local(_) -> t_local({'init', Config}) ->
Node = start_slave('local_shared_sub_test19', 21884), Node = start_slave(local_shared_sub_test19, 21884),
GroupConfig = #{ GroupConfig = #{
<<"local_group_fallback">> => local, <<"local_group_fallback">> => local,
<<"local_group">> => local, <<"local_group">> => local,
@ -352,7 +363,11 @@ t_local(_) ->
}, },
ok = ensure_group_config(Node, GroupConfig), ok = ensure_group_config(Node, GroupConfig),
ok = ensure_group_config(GroupConfig), ok = ensure_group_config(GroupConfig),
[{slave_node, Node} | Config];
t_local({'end', _Config}) ->
ok = stop_slave(local_shared_sub_test19);
t_local(Config) when is_list(Config) ->
Node = proplists:get_value(slave_node, Config),
Topic = <<"local_foo1/bar">>, Topic = <<"local_foo1/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
@ -388,7 +403,7 @@ t_local(_) ->
?assertNotEqual(UsedSubPid1, UsedSubPid2), ?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok. ok.
t_local_fallback(_) -> t_local_fallback({'init', Config}) ->
ok = ensure_group_config(#{ ok = ensure_group_config(#{
<<"local_group_fallback">> => local, <<"local_group_fallback">> => local,
<<"local_group">> => local, <<"local_group">> => local,
@ -396,10 +411,15 @@ t_local_fallback(_) ->
<<"sticky_group">> => sticky <<"sticky_group">> => sticky
}), }),
Node = start_slave(local_fallback_shared_sub_test19, 11885),
[{slave_node, Node} | Config];
t_local_fallback({'end', _}) ->
ok = stop_slave(local_fallback_shared_sub_test19);
t_local_fallback(Config) when is_list(Config) ->
Topic = <<"local_foo2/bar">>, Topic = <<"local_foo2/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>, ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_test19', 11885), Node = proplists:get_value(slave_node, Config),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid1),
@ -407,6 +427,7 @@ t_local_fallback(_) ->
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}), emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
[{share, Topic, {ok, 1}}] = emqx:publish(Message1), [{share, Topic, {ok, 1}}] = emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
@ -422,7 +443,7 @@ t_local_fallback(_) ->
%% This one tests that broker tries to select another shared subscriber %% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK %% If the first one doesn't return an ACK
t_redispatch(_) -> t_redispatch(Config) when is_list(Config) ->
ok = ensure_config(sticky, true), ok = ensure_config(sticky, true),
application:set_env(emqx, shared_dispatch_ack_enabled, true), application:set_env(emqx, shared_dispatch_ack_enabled, true),
@ -453,7 +474,7 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2), emqtt:stop(UsedSubPid2),
ok. ok.
t_dispatch_when_inflights_are_full(_) -> t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
ok = ensure_config(round_robin, true), ok = ensure_config(round_robin, true),
Topic = <<"foo/bar">>, Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>, ClientId1 = <<"ClientId1">>,
@ -536,8 +557,20 @@ recv_msgs(Count, Msgs) ->
end. end.
start_slave(Name, Port) -> start_slave(Name, Port) ->
ok = emqx_ct_helpers:start_apps([emqx_modules]),
Listeners = [#{listen_on => {{127,0,0,1}, Port}, Listeners = [#{listen_on => {{127,0,0,1}, Port},
start_apps => [emqx, emqx_modules],
name => "internal", name => "internal",
opts => [{zone,internal}], opts => [{zone,internal}],
proto => tcp}], proto => tcp}],
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}). emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
stop_slave(Name) ->
emqx_node_helpers:stop_slave(Name).
load_app(App) ->
case application:load(App) of
ok -> ok;
{error, {already_loaded, _}} -> ok;
{error, Reason} -> error({failed_to_load_app, App, Reason})
end.