Merge pull request #9070 from thalesmg/merge-rv43-into-rv44-a

Merge rv43 into rv44 a
This commit is contained in:
Thales Macedo Garitezi 2022-09-28 11:17:01 -03:00 committed by GitHub
commit 61af0831f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 349 additions and 82 deletions

View File

@ -181,7 +181,10 @@ jobs:
pkg_name=$(find _packages/${EMQX_NAME} -mindepth 1 -maxdepth 1 -iname \*.zip)
unzip -q $pkg_name
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
# test with a spaces in path
mv ./emqx "./emqx home/"
cd "./emqx home/"
./bin/emqx start || cat log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:18083 > /dev/null; then
@ -192,12 +195,13 @@ jobs:
done
if [ "$ready" != "yes" ]; then
echo "Timed out waiting for emqx to be ready"
cat emqx/log/erlang.log.1
cat log/erlang.log.1
exit 1
fi
./emqx/bin/emqx_ctl status
./emqx/bin/emqx stop
rm -rf emqx
./bin/emqx_ctl status
./bin/emqx stop
cd ..
rm -rf "emqx home"
- uses: actions/upload-artifact@v2
with:
name: macos

View File

@ -10,6 +10,10 @@ File format:
- One list item per change topic
Change log ends with a list of GitHub PRs
## v4.3.22
### Minor changes
## v4.3.21
### Enhancements
@ -23,13 +27,15 @@ File format:
- TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007)
Eliminate uncertainty that the buffer size is set by OS default.
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955)
- Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
### Bug fixes
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
## v4.3.20
### Bug fixes

View File

@ -20,6 +20,7 @@
-compile(export_all).
-include("emqx_auth_mnesia.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -78,15 +79,37 @@ init_per_testcase_migration(_, Config) ->
emqx_acl_mnesia_migrator:migrate_records(),
Config.
init_per_testcase_other(t_last_will_testament_message_check_acl, Config) ->
OriginalACLNoMatch = application:get_env(emqx, acl_nomatch),
application:set_env(emqx, acl_nomatch, deny),
emqx_mod_acl_internal:unload([]),
%% deny all for this client
ClientID = <<"lwt_client">>,
ok = emqx_acl_mnesia_db:add_acl({clientid, ClientID}, <<"#">>, pubsub, deny),
[ {original_acl_nomatch, OriginalACLNoMatch}
, {clientid, ClientID}
| Config];
init_per_testcase_other(_TestCase, Config) ->
Config.
init_per_testcase(Case, Config) ->
PerTestInitializers = [
fun init_per_testcase_clean/2,
fun init_per_testcase_migration/2,
fun init_per_testcase_emqx_hook/2
fun init_per_testcase_emqx_hook/2,
fun init_per_testcase_other/2
],
lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers).
end_per_testcase(_, Config) ->
end_per_testcase(t_last_will_testament_message_check_acl, Config) ->
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
case ?config(original_acl_nomatch, Config) of
{ok, Original} -> application:set_env(emqx, acl_nomatch, Original);
_ -> ok
end,
emqx_mod_acl_internal:load([]),
ok;
end_per_testcase(_TestCase, Config) ->
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
Config.
@ -465,6 +488,35 @@ t_rest_api(_Config) ->
{ok, Res3} = request_http_rest_list(["$all"]),
?assertMatch([], get_http_data(Res3)).
%% asserts that we check ACL for the LWT topic before publishing the
%% LWT.
t_last_will_testament_message_check_acl(Config) ->
ClientID = ?config(clientid, Config),
{ok, C} = emqtt:start_link([
{clientid, ClientID},
{will_topic, <<"$SYS/lwt">>},
{will_payload, <<"should not be published">>}
]),
{ok, _} = emqtt:connect(C),
ok = emqx:subscribe(<<"$SYS/lwt">>),
unlink(C),
ok = snabbkaffe:start_trace(),
{true, {ok, _}} =
?wait_async_action(
exit(C, kill),
#{?snk_kind := last_will_testament_publish_denied},
1_000
),
ok = snabbkaffe:stop(),
receive
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
error(lwt_should_not_be_published_to_forbidden_topic)
after 1_000 ->
ok
end,
ok.
create_conflicting_records() ->
Records = [

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_PSK_FILE).
-define(EMQX_PSK_FILE, true).
-define(PSK_FILE_TAB, emqx_psk_file).
-record(psk_entry, {psk_id :: binary(),
psk_str :: binary()}).
-endif.

View File

@ -1,6 +1,6 @@
{application, emqx_psk_file,
[{description,"EMQX PSK Plugin from File"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules,[]},
{registered,[emqx_psk_file_sup]},
{applications,[kernel,stdlib]},

View File

@ -0,0 +1,10 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.0",
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
],
[{"4.3.0",
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
]}.

View File

@ -16,6 +16,7 @@
-module(emqx_psk_file).
-include("emqx_psk_file.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -26,15 +27,10 @@
%% Hooks functions
-export([on_psk_lookup/2]).
-define(TAB, ?MODULE).
-define(LF, 10).
-record(psk_entry, {psk_id :: binary(),
psk_str :: binary()}).
%% Called when the plugin application start
load(Env) ->
_ = ets:new(?TAB, [set, named_table, {keypos, #psk_entry.psk_id}]),
{ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]),
preload_psks(PskFile, bin(get_value(delimiter, Env))),
_ = file:close(PskFile),
@ -45,7 +41,7 @@ unload() ->
emqx:unhook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2).
on_psk_lookup(ClientPSKID, UserState) ->
case ets:lookup(?TAB, ClientPSKID) of
case ets:lookup(?PSK_FILE_TAB, ClientPSKID) of
[#psk_entry{psk_str = PskStr}] ->
{stop, PskStr};
[] ->
@ -57,7 +53,9 @@ preload_psks(FileHandler, Delimiter) ->
{ok, Line} ->
case binary:split(Line, Delimiter) of
[Key, Rem] ->
ets:insert(?TAB, #psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}),
ets:insert(
?PSK_FILE_TAB,
#psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}),
preload_psks(FileHandler, Delimiter);
[Line] ->
?LOG(warning, "[~p] - Invalid line: ~p, delimiter: ~p", [?MODULE, Line, Delimiter])

View File

@ -16,6 +16,8 @@
-module(emqx_psk_file_sup).
-include("emqx_psk_file.hrl").
-behaviour(supervisor).
%% API
@ -25,8 +27,11 @@
-export([init/1]).
start_link() ->
_ = ets:new(
?PSK_FILE_TAB,
[set, named_table, public, {keypos, #psk_entry.psk_id}]
),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, { {one_for_one, 0, 1}, []} }.

View File

@ -1,6 +1,6 @@
{application, emqx_retainer,
[{description, "EMQ X Retainer"},
{vsn, "4.4.2"}, % strict semver, bump manually!
{vsn, "4.4.3"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},

View File

@ -34,6 +34,23 @@ init([Env]) ->
type => worker,
modules => [emqx_retainer]} || not is_managed_by_modules()]}}.
-ifdef(EMQX_ENTERPRISE).
is_managed_by_modules() ->
try
case supervisor:get_childspec(emqx_modules_sup, emqx_retainer) of
{ok, _} -> true;
_ -> false
end
catch
exit : {noproc, _} ->
false
end.
-else.
is_managed_by_modules() ->
%% always false for opensource edition
false.
-endif.

View File

@ -20,7 +20,9 @@
-export([ensure_start/0, ensure_stop/0]).
-ifdef(EMQX_ENTERPRISE).
ensure_start() ->
%% for enterprise edition, retainer is started by modules
application:stop(emqx_modules),
ensure_stop(),
init_conf(),
emqx_ct_helpers:start_apps([emqx_retainer]),
ok.
@ -29,6 +31,7 @@ ensure_start() ->
ensure_start() ->
init_conf(),
ensure_stop(),
emqx_ct_helpers:start_apps([emqx_retainer]),
ok.

View File

@ -523,6 +523,16 @@ case "$1" in
;;
esac
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS_FILE" ]; then
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"
echoerr "Please make sure the node is initialized (started for at least once)"
exit 1
fi
fi
if [ -z "$NAME_ARG" ]; then
NODENAME="${EMQX_NODE_NAME:-}"
@ -530,15 +540,7 @@ if [ -z "$NAME_ARG" ]; then
[ -z "$NODENAME" ] && [ -n "$EMQX_NAME" ] && [ -n "$EMQX_HOST" ] && NODENAME="${EMQX_NAME}@${EMQX_HOST}"
if [ -z "$NODENAME" ]; then
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
# for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args 2>/dev/null | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echoerr "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
echoerr "Please make sure the node is initialized (started for at least once)"
exit 1
fi
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')"
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
else
# for boot commands, inspect emqx.conf for node name
NODENAME=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.name)
@ -573,13 +575,7 @@ if [ -z "$COOKIE" ]; then
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
COOKIE=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie)
else
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args | head -1)"
if [ -z "$LATEST_VM_ARGS" ]; then
echo "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
exit 1
fi
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS" | awk '{print $2}')"
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
fi
fi

View File

@ -62,7 +62,8 @@ app_specific_actions(_) ->
ignored_apps() ->
[gpb, %% only a build tool
emqx_dashboard, %% generic appup file for all versions
emqx_management %% generic appup file for all versions
emqx_management, %% generic appup file for all versions
emqx_modules_spec %% generic appup file for all versions
] ++ otp_standard_apps().
main(Args) ->
@ -284,9 +285,9 @@ merge_update_actions(App, Changes, Vsns, PrevVersion) ->
%% but there is a 1.1.2 in appup we may skip merging instructions for
%% 1.1.2 because it's not used and no way to know what has been changed
is_skipped_version(App, Vsn, PrevVersion) when is_list(Vsn) andalso is_list(PrevVersion) ->
case is_app_external(App) andalso parse_version_number(Vsn) of
case is_app_external(App) andalso parse_version(Vsn, non_strict_semver) of
{ok, VsnTuple} ->
case parse_version_number(PrevVersion) of
case parse_version(PrevVersion, non_strict_semver) of
{ok, PrevVsnTuple} ->
VsnTuple > PrevVsnTuple;
_ ->
@ -312,7 +313,7 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
true ->
[];
false ->
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed] ++
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed, not is_secret_module(M)] ++
[{add_module, M} || M <- New]
end,
{OldActionsWithStop, OldActionsAfterStop} =
@ -324,10 +325,18 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
true ->
[];
false ->
[{delete_module, M} || M <- Deleted]
[{delete_module, M} || M <- Deleted, not is_secret_module(M)]
end ++
AppSpecific.
%% Do not reload or delet _secret modules
is_secret_module(Module) ->
Suffix = "_secret",
case string:right(atom_to_list(Module), length(Suffix)) of
Suffix -> true;
_ -> false
end.
%% If an entry restarts an application, there's no need to use
%% `load_module' instructions.
contains_restart_application(Application, Actions) ->
@ -397,7 +406,7 @@ contains_version(Needle, Haystack) when is_list(Needle) ->
%% past versions that should be covered by regexes in .appup file
%% instructions.
enumerate_past_versions(Vsn) when is_list(Vsn) ->
case parse_version_number(Vsn) of
case parse_version(Vsn) of
{ok, ParsedVsn} ->
{ok, enumerate_past_versions(ParsedVsn)};
Error ->
@ -406,14 +415,39 @@ enumerate_past_versions(Vsn) when is_list(Vsn) ->
enumerate_past_versions({Major, Minor, Patch}) ->
[{Major, Minor, P} || P <- lists:seq(Patch - 1, 0, -1)].
parse_version_number(Vsn) when is_list(Vsn) ->
Nums = string:split(Vsn, ".", all),
Results = lists:map(fun string:to_integer/1, Nums),
case Results of
[{Major, []}, {Minor, []}, {Patch, []}] ->
{ok, {Major, Minor, Patch}};
_ ->
{error, bad_version}
parse_version(Vsn) ->
parse_version(Vsn, strict_semver).
parse_version(Vsn, MaybeSemver) when is_list(Vsn) ->
case parse_dot_separated_numbers(Vsn) of
{ok, {_Major, _Minor, _Patch}} = Res ->
Res;
{ok, Nums} ->
case MaybeSemver of
strict_semver ->
{error, {bad_semver, Vsn}};
non_strict_semver ->
{ok, Nums}
end;
{error, Reason} ->
{error, {Reason, Vsn}}
end.
parse_dot_separated_numbers(Str) when is_list(Str) ->
try
Split = string:split(Str, ".", all),
IntL = lists:map(fun(SubStr) ->
case string:to_integer(SubStr) of
{Int, []} when is_integer(Int) ->
Int;
_ ->
throw(no_integer)
end
end, Split),
{ok, list_to_tuple(IntL)}
catch
_ : _ ->
{error, bad_version_string}
end.
vsn_number_to_string({Major, Minor, Patch}) ->

View File

@ -2,7 +2,10 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.9",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -14,7 +17,10 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
@ -28,7 +34,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
{"4.4.7",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -44,7 +51,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -60,7 +68,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -78,7 +87,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -104,7 +114,8 @@
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -137,7 +148,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.2",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
@ -172,7 +184,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
@ -213,7 +226,8 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{add_module,emqx_secret},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
@ -257,7 +271,9 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.9",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
@ -269,7 +285,9 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},

View File

@ -228,6 +228,7 @@ shutdown() ->
shutdown(Reason) ->
?LOG(critical, "emqx shutdown for ~s", [Reason]),
on_shutdown(Reason),
_ = emqx_plugins:unload(),
lists:foreach(fun application:stop/1
, lists:reverse(default_started_applications())
@ -238,10 +239,12 @@ reboot() ->
true ->
_ = application:stop(emqx_dashboard), %% dashboard must be started after mnesia
lists:foreach(fun application:start/1 , default_started_applications()),
application:start(emqx_dashboard);
_ = application:start(emqx_dashboard),
on_reboot();
false ->
lists:foreach(fun application:start/1 , default_started_applications())
lists:foreach(fun application:start/1 , default_started_applications()),
on_reboot()
end.
is_application_running(App) ->
@ -256,6 +259,32 @@ default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
-endif.
-ifdef(EMQX_ENTERPRISE).
on_reboot() ->
try
_ = emqx_license_api:bootstrap_license(),
ok
catch
Kind:Reason:Stack ->
?LOG(critical, "~p while rebooting: ~p, ~p", [Kind, Reason, Stack]),
ok
end,
ok.
on_shutdown(join) ->
emqx_modules:sync_load_modules_file(),
ok;
on_shutdown(_) ->
ok.
-else.
on_reboot() ->
ok.
on_shutdown(_) ->
ok.
-endif.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -976,9 +976,10 @@ return_sub_unsub_ack(Packet, Channel) ->
handle_call(kick, Channel = #channel{
conn_state = ConnState,
will_msg = WillMsg,
clientinfo = ClientInfo,
conninfo = #{proto_ver := ProtoVer}
}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
Channel1 = case ConnState of
connected -> ensure_disconnected(kicked, Channel);
_ -> Channel
@ -1140,8 +1141,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg,
clientinfo = ClientInfo}) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
handle_timeout(_TRef, expire_quota_limit, Channel) ->
@ -1197,9 +1199,10 @@ terminate(_Reason, #channel{conn_state = idle} = _Channel) ->
ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
terminate(Reason, Channel = #channel{will_msg = WillMsg,
clientinfo = ClientInfo}) ->
should_publish_will_message(Reason, Channel)
andalso publish_will_msg(WillMsg),
andalso publish_will_msg(ClientInfo, WillMsg),
run_terminate_hook(Reason, Channel).
run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
@ -1741,10 +1744,11 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg,
clientinfo = ClientInfo}) ->
case will_delay_interval(WillMsg) of
0 ->
ok = publish_will_msg(WillMsg),
ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined};
I ->
ensure_timer(will_timer, timer:seconds(I), Channel)
@ -1754,9 +1758,19 @@ will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval',
emqx_message:get_header(properties, WillMsg, #{}), 0).
publish_will_msg(Msg) ->
_ = emqx_broker:publish(Msg),
ok.
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
case emqx_access_control:check_acl(ClientInfo, publish, Topic) of
allow ->
_ = emqx_broker:publish(Msg),
ok;
deny ->
?tp(
warning,
last_will_testament_publish_denied,
#{topic => Topic}
),
ok
end.
%%--------------------------------------------------------------------
%% Disconnect Reason

View File

@ -96,8 +96,8 @@ do_parse(URI) ->
%% underscores replaced with hyphens
%% NOTE: assuming the input Headers list is a proplists,
%% that is, when a key is duplicated, list header overrides tail
%% e.g. [{"Content_Type", "applicaiton/binary"}, {<<"content-type">>, "applicaiton/json"}]
%% results in: [{"content-type", "applicaiton/binary"}]
%% e.g. [{"Content_Type", "applicaiton/binary"}, {"content-type", "applicaiton/json"}]
%% results in: [{<<"content-type">>, "applicaiton/binary"}]
normalise_headers(Headers0) ->
F = fun({K0, V}) ->
K = re:replace(K0, "_", "-", [{return,binary}]),

View File

@ -47,6 +47,7 @@
, index_of/2
, maybe_parse_ip/1
, ipv6_probe/1
, ipv6_probe/2
]).
-export([ bin2hexstr_a_f_upper/1
@ -86,12 +87,15 @@ maybe_parse_ip(Host) ->
%% @doc Add `ipv6_probe' socket option if it's supported.
ipv6_probe(Opts) ->
ipv6_probe(Opts, true).
ipv6_probe(Opts, Ipv6Probe) when is_boolean(Ipv6Probe) orelse is_integer(Ipv6Probe) ->
Bool = try gen_tcp:ipv6_probe()
catch _ : _ -> false end,
ipv6_probe(Bool, Opts).
ipv6_probe(Bool, Opts, Ipv6Probe).
ipv6_probe(false, Opts) -> Opts;
ipv6_probe(true, Opts) -> [{ipv6_probe, true} | Opts].
ipv6_probe(false, Opts, _) -> Opts;
ipv6_probe(true, Opts, Ipv6Probe) -> [{ipv6_probe, Ipv6Probe} | Opts].
%% @doc Merge options
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).

View File

@ -24,6 +24,7 @@
-export([init/0]).
-export([ load/0
, force_load/0
, load/1
, unload/0
, unload/1
@ -59,12 +60,17 @@ init() ->
%% @doc Load all plugins when the broker started.
-spec(load() -> ok | ignore | {error, term()}).
load() ->
do_load(#{force_load => false}).
force_load() ->
do_load(#{force_load => true}).
do_load(Options) ->
ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
case emqx:get_env(plugins_loaded_file) of
undefined -> ignore; %% No plugins available
File ->
_ = ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
with_loaded_file(File, fun(Names) -> load_plugins(Names, Options, false) end)
end.
%% @doc Load a Plugin
@ -282,18 +288,23 @@ filter_plugins([{Name, Load} | Names], Plugins) ->
filter_plugins([Name | Names], Plugins) when is_atom(Name) ->
filter_plugins([{Name, true} | Names], Plugins).
load_plugins(Names, Persistent) ->
load_plugins(Names, Options, Persistent) ->
Plugins = list(),
NotFound = Names -- names(Plugins),
case NotFound of
[] -> ok;
NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
end,
NeedToLoad = (Names -- NotFound) -- names(started_app),
NeedToLoad0 = Names -- NotFound,
NeedToLoad1 =
case Options of
#{force_load := true} -> NeedToLoad0;
_ -> NeedToLoad0 -- names(started_app)
end,
lists:foreach(fun(Name) ->
Plugin = find_plugin(Name, Plugins),
load_plugin(Plugin#plugin.name, Persistent)
end, NeedToLoad).
end, NeedToLoad1).
generate_configs(App) ->
ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",

41
src/emqx_secret.erl Normal file
View File

@ -0,0 +1,41 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Note: this module CAN'T be hot-patched to avoid invalidating the
%% closures, so it must not be changed.
-module(emqx_secret).
%% API:
-export([wrap/1, unwrap/1]).
%%================================================================================
%% API funcions
%%================================================================================
wrap(undefined) ->
undefined;
wrap(Func) when is_function(Func) ->
Func;
wrap(Term) ->
fun() ->
Term
end.
unwrap(Term) when is_function(Term, 0) ->
%% Handle potentially nested funs
unwrap(Term());
unwrap(Term) ->
Term.