Merge remote-tracking branch 'origin/master' into 0529-donot-copy-cluster-conf-from-newer-version

This commit is contained in:
Zaiming (Stone) Shi 2023-06-07 09:14:04 +02:00
commit cb3e787cb1
33 changed files with 1277 additions and 228 deletions

View File

@ -43,8 +43,18 @@ runs:
echo "OTP_SOURCE_PATH=$OTP_SOURCE_PATH" >> $GITHUB_OUTPUT
echo "OTP_INSTALL_PATH=$OTP_INSTALL_PATH" >> $GITHUB_OUTPUT
mkdir -p "$OTP_SOURCE_PATH" "$OTP_INSTALL_PATH"
# we need this to skip using cache for self-hosted runners
case ${{ inputs.os }} in
*arm64)
echo "SELF_HOSTED=true" >> $GITHUB_OUTPUT
;;
*)
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
;;
esac
- uses: actions/cache@v3
id: cache
if: steps.prepare.outputs.SELF_HOSTED != 'true'
with:
path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
@ -54,22 +64,36 @@ runs:
run: |
OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}"
OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}"
SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED }}"
# when it's self-hosted, it never hits the cache,
# skip rebuild if it's self-hosted and the install path already has a 'bin'
if [ "${SELF_HOSTED:-false}" = 'true' ]; then
if [ -n "$OTP_INSTALL_PATH" ] && [ -d "$OTP_INSTALL_PATH/bin" ]; then
echo "Skip rebuilding OTP, found $OTP_INSTALL_PATH"
exit 0
fi
fi
## when it's not self-hosted, or the install path is not found,
## build otp from source code.
if [ -d "$OTP_SOURCE_PATH" ]; then
rm -rf "$OTP_SOURCE_PATH"
fi
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
cd "$OTP_SOURCE_PATH"
if [ "$(arch)" = arm64 ]; then
export CFLAGS="-O2 -g -I$(brew --prefix unixodbc)/include"
export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
WITH_ODBC="--with-odbc=$(brew --prefix unixodbc)"
else
WITH_ODBC=""
fi
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) ${WITH_ODBC} --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
make -j$(nproc)
rm -rf "$OTP_INSTALL_PATH"
make install
if [ "$(arch)" = arm64 ]; then
unset CFLAGS
unset LDFLAGS
unset CC
fi
- name: build
env:
@ -87,6 +111,10 @@ runs:
shell: bash
run: |
export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH"
# inspec erl in PATH
which erl
# inspec erl command banner
erl -s init stop
make ensure-rebar3
mkdir -p $HOME/bin
cp rebar3 $HOME/bin/rebar3

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.1.0-alpha.2").
-define(EMQX_RELEASE_CE, "5.1.0-alpha.3").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.1.0-alpha.2").
-define(EMQX_RELEASE_EE, "5.1.0-alpha.3").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -57,6 +57,7 @@
]).
-export([pre_config_update/3, post_config_update/5]).
-export([create_listener/3, remove_listener/3, update_listener/3]).
-export([format_bind/1]).
@ -65,8 +66,8 @@
-endif.
-type listener_id() :: atom() | binary().
-define(CONF_KEY_PATH, [listeners, '?', '?']).
-define(ROOT_KEY, listeners).
-define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']).
-define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
-define(MARK_DEL, ?TOMBSTONE_CONFIG_CHANGE_REQ).
@ -212,7 +213,10 @@ shutdown_count(_, _, _) ->
start() ->
%% The ?MODULE:start/0 will be called by emqx_app when emqx get started,
%% so we install the config handler here.
%% callback when http api request
ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
%% callback when reload from config file
ok = emqx_config_handler:add_handler([?ROOT_KEY], ?MODULE),
foreach_listeners(fun start_listener/3).
-spec start_listener(listener_id()) -> ok | {error, term()}.
@ -287,7 +291,8 @@ restart_listener(Type, ListenerName, OldConf, NewConf) ->
stop() ->
%% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown,
%% so we uninstall the config handler here.
_ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
ok = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
ok = emqx_config_handler:remove_handler([?ROOT_KEY]),
foreach_listeners(fun stop_listener/3).
-spec stop_listener(listener_id()) -> ok | {error, term()}.
@ -463,50 +468,34 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
end.
%% Update the listeners at runtime
pre_config_update([listeners, Type, Name], {create, NewConf}, V) when
pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when
V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
->
CertsDir = certs_dir(Type, Name),
{ok, convert_certs(CertsDir, NewConf)};
pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) ->
{ok, convert_certs(Type, Name, NewConf)};
pre_config_update([?ROOT_KEY, _Type, _Name], {create, _NewConf}, _RawConf) ->
{error, already_exist};
pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) ->
pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) ->
{error, not_found};
pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
NewConfT = emqx_utils_maps:deep_merge(RawConf, Request),
NewConf = ensure_override_limiter_conf(NewConfT, Request),
CertsDir = certs_dir(Type, Name),
{ok, convert_certs(CertsDir, NewConf)};
pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
NewConf = emqx_utils_maps:deep_merge(RawConf, Updated),
{ok, NewConf};
pre_config_update([listeners, _Type, _Name], ?MARK_DEL, _RawConf) ->
pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) ->
RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request),
RawConf2 = ensure_override_limiter_conf(RawConf1, Request),
{ok, convert_certs(Type, Name, RawConf2)};
pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) ->
{ok, emqx_utils_maps:deep_merge(RawConf, Updated)};
pre_config_update([?ROOT_KEY, _Type, _Name], ?MARK_DEL, _RawConf) ->
{ok, ?TOMBSTONE_VALUE};
pre_config_update(_Path, _Request, RawConf) ->
{ok, RawConf}.
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)}.
post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
start_listener(Type, Name, NewConf);
post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
case NewConf of
#{enabled := true} -> restart_listener(Type, Name, {OldConf, NewConf});
_ -> ok
end;
post_config_update([listeners, Type, Name], Op, _, OldConf, _AppEnvs) when
Op =:= ?MARK_DEL andalso is_map(OldConf)
->
ok = unregister_ocsp_stapling_refresh(Type, Name),
case stop_listener(Type, Name, OldConf) of
ok ->
_ = emqx_authentication:delete_chain(listener_id(Type, Name)),
CertsDir = certs_dir(Type, Name),
clear_certs(CertsDir, OldConf);
Err ->
Err
end;
post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) ->
create_listener(Type, Name, NewConf);
post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
update_listener(Type, Name, {OldConf, NewConf});
post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
remove_listener(Type, Name, OldConf);
post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
#{enabled := NewEnabled} = NewConf,
#{enabled := OldEnabled} = OldConf,
case {NewEnabled, OldEnabled} of
@ -523,9 +512,65 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo
ok = unregister_ocsp_stapling_refresh(Type, Name),
stop_listener(Type, Name, OldConf)
end;
post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) ->
ok;
post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf),
Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed),
perform_listener_changes([
{fun ?MODULE:remove_listener/3, Removed},
{fun ?MODULE:update_listener/3, Updated},
{fun ?MODULE:create_listener/3, Added}
]);
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
ok.
create_listener(Type, Name, NewConf) ->
Res = start_listener(Type, Name, NewConf),
recreate_authenticator(Res, Type, Name, NewConf).
recreate_authenticator(ok, Type, Name, Conf) ->
Chain = listener_id(Type, Name),
_ = emqx_authentication:delete_chain(Chain),
case maps:get(authentication, Conf, []) of
[] -> ok;
AuthN -> emqx_authentication:create_authenticator(Chain, AuthN)
end;
recreate_authenticator(Error, _Type, _Name, _NewConf) ->
Error.
remove_listener(Type, Name, OldConf) ->
ok = unregister_ocsp_stapling_refresh(Type, Name),
case stop_listener(Type, Name, OldConf) of
ok ->
_ = emqx_authentication:delete_chain(listener_id(Type, Name)),
clear_certs(certs_dir(Type, Name), OldConf);
Err ->
Err
end.
update_listener(Type, Name, {OldConf, NewConf}) ->
try_clear_ssl_files(certs_dir(Type, Name), NewConf, OldConf),
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
Res = restart_listener(Type, Name, {OldConf, NewConf}),
recreate_authenticator(Res, Type, Name, NewConf).
perform_listener_changes([]) ->
ok;
perform_listener_changes([{Action, ConfL} | Tasks]) ->
case perform_listener_changes(Action, ConfL) of
ok -> perform_listener_changes(Tasks);
{error, Reason} -> {error, Reason}
end.
perform_listener_changes(_Action, []) ->
ok;
perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) ->
case Action(Type, Name, Diff) of
ok -> perform_listener_changes(Action, MapConf);
{error, Reason} -> {error, Reason}
end.
esockd_opts(ListenerId, Type, Opts0) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Limiter = limiter(Opts0),
@ -701,6 +746,29 @@ del_limiter_bucket(Id, Conf) ->
)
end.
diff_confs(NewConfs, OldConfs) ->
emqx_utils:diff_lists(
flatten_confs(NewConfs),
flatten_confs(OldConfs),
fun({Key, _}) -> Key end
).
flatten_confs(Conf0) ->
lists:flatmap(
fun({Type, Conf}) ->
do_flatten_confs(Type, Conf)
end,
maps:to_list(Conf0)
).
do_flatten_confs(Type, Conf0) ->
FilterFun =
fun
({_Name, ?TOMBSTONE_TYPE}) -> false;
({Name, Conf}) -> {true, {{Type, Name}, Conf}}
end,
lists:filtermap(FilterFun, maps:to_list(Conf0)).
enable_authn(Opts) ->
maps:get(enable_authn, Opts, true).
@ -762,14 +830,32 @@ parse_bind(#{<<"bind">> := Bind}) ->
certs_dir(Type, Name) ->
iolist_to_binary(filename:join(["listeners", Type, Name])).
convert_certs(CertsDir, Conf) ->
convert_certs(ListenerConf) ->
maps:fold(
fun(Type, Listeners0, Acc) ->
Listeners1 =
maps:fold(
fun(Name, Conf, Acc1) ->
Acc1#{Name => convert_certs(Type, Name, Conf)}
end,
#{},
Listeners0
),
Acc#{Type => Listeners1}
end,
#{},
ListenerConf
).
convert_certs(Type, Name, Conf) ->
CertsDir = certs_dir(Type, Name),
case emqx_tls_lib:ensure_ssl_files(CertsDir, get_ssl_options(Conf)) of
{ok, undefined} ->
Conf;
{ok, SSL} ->
Conf#{<<"ssl_options">> => SSL};
{error, Reason} ->
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
?SLOG(error, Reason#{msg => "bad_ssl_config", type => Type, name => Name}),
throw({bad_ssl_config, Reason})
end.
@ -791,13 +877,15 @@ try_clear_ssl_files(CertsDir, NewConf, OldConf) ->
OldSSL = get_ssl_options(OldConf),
emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL, OldSSL).
get_ssl_options(Conf) ->
get_ssl_options(Conf = #{}) ->
case maps:find(ssl_options, Conf) of
{ok, SSL} ->
SSL;
error ->
maps:get(<<"ssl_options">>, Conf, undefined)
end.
end;
get_ssl_options(_) ->
undefined.
%% @doc Get QUIC optional settings for low level tunings.
%% @see quicer:quic_settings()
@ -889,8 +977,5 @@ unregister_ocsp_stapling_refresh(Type, Name) ->
emqx_ocsp_cache:unregister_listener(ListenerId),
ok.
%% There is currently an issue with frontend
%% infinity is not a good value for it, so we use 5m for now
default_max_conn() ->
%% TODO: <<"infinity">>
5_000_000.
<<"infinity">>.

View File

@ -94,7 +94,8 @@
validate_keepalive_multiplier/1,
non_empty_string/1,
validations/0,
naive_env_interpolation/1
naive_env_interpolation/1,
validate_server_ssl_opts/1
]).
-export([qos/0]).
@ -958,7 +959,7 @@ fields("mqtt_wss_listener") ->
{"ssl_options",
sc(
ref("listener_wss_opts"),
#{}
#{validator => fun validate_server_ssl_opts/1}
)},
{"websocket",
sc(
@ -2426,8 +2427,21 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
]
].
validate_server_ssl_opts(#{<<"fail_if_no_peer_cert">> := true, <<"verify">> := Verify}) ->
validate_verify(Verify);
validate_server_ssl_opts(#{fail_if_no_peer_cert := true, verify := Verify}) ->
validate_verify(Verify);
validate_server_ssl_opts(_SSLOpts) ->
ok.
validate_verify(verify_peer) ->
ok;
validate_verify(_) ->
{error, "verify must be verify_peer when fail_if_no_peer_cert is true"}.
mqtt_ssl_listener_ssl_options_validator(Conf) ->
Checks = [
fun validate_server_ssl_opts/1,
fun ocsp_outer_validator/1,
fun crl_outer_validator/1
],

View File

@ -0,0 +1,154 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_listeners_update_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_listeners, [current_conns/2, is_running/1]).
-define(LISTENERS, [listeners]).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) ->
Init = emqx:get_raw_config(?LISTENERS),
[{init_conf, Init} | Config].
end_per_testcase(_TestCase, Config) ->
Conf = ?config(init_conf, Config),
{ok, _} = emqx:update_config(?LISTENERS, Conf),
ok.
t_default_conf(_Config) ->
?assertMatch(
#{
<<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:1883">>}},
<<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8883">>}},
<<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}},
<<"wss">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8084">>}}
},
emqx:get_raw_config(?LISTENERS)
),
?assertMatch(
#{
tcp := #{default := #{bind := {{0, 0, 0, 0}, 1883}}},
ssl := #{default := #{bind := {{0, 0, 0, 0}, 8883}}},
ws := #{default := #{bind := {{0, 0, 0, 0}, 8083}}},
wss := #{default := #{bind := {{0, 0, 0, 0}, 8084}}}
},
emqx:get_config(?LISTENERS)
),
ok.
t_update_conf(_Conf) ->
Raw = emqx:get_raw_config(?LISTENERS),
Raw1 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"bind">>], Raw, <<"127.0.0.1:1883">>
),
Raw2 = emqx_utils_maps:deep_put(
[<<"ssl">>, <<"default">>, <<"bind">>], Raw1, <<"127.0.0.1:8883">>
),
Raw3 = emqx_utils_maps:deep_put(
[<<"ws">>, <<"default">>, <<"bind">>], Raw2, <<"0.0.0.0:8083">>
),
Raw4 = emqx_utils_maps:deep_put(
[<<"wss">>, <<"default">>, <<"bind">>], Raw3, <<"127.0.0.1:8084">>
),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)),
?assertMatch(
#{
<<"tcp">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:1883">>}},
<<"ssl">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8883">>}},
<<"ws">> := #{<<"default">> := #{<<"bind">> := <<"0.0.0.0:8083">>}},
<<"wss">> := #{<<"default">> := #{<<"bind">> := <<"127.0.0.1:8084">>}}
},
emqx:get_raw_config(?LISTENERS)
),
BindTcp = {{127, 0, 0, 1}, 1883},
BindSsl = {{127, 0, 0, 1}, 8883},
BindWs = {{0, 0, 0, 0}, 8083},
BindWss = {{127, 0, 0, 1}, 8084},
?assertMatch(
#{
tcp := #{default := #{bind := BindTcp}},
ssl := #{default := #{bind := BindSsl}},
ws := #{default := #{bind := BindWs}},
wss := #{default := #{bind := BindWss}}
},
emqx:get_config(?LISTENERS)
),
?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
?assertEqual(0, current_conns(<<"tcp:default">>, BindTcp)),
?assertEqual(0, current_conns(<<"ssl:default">>, BindSsl)),
?assertEqual({0, 0, 0, 0}, proplists:get_value(ip, ranch:info('ws:default'))),
?assertEqual({127, 0, 0, 1}, proplists:get_value(ip, ranch:info('wss:default'))),
?assert(is_running('ws:default')),
?assert(is_running('wss:default')),
ok.
t_add_delete_conf(_Conf) ->
Raw = emqx:get_raw_config(?LISTENERS),
%% add
#{<<"tcp">> := #{<<"default">> := Tcp}} = Raw,
NewBind = <<"127.0.0.1:1987">>,
Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"new">>], Raw, Tcp#{<<"bind">> => NewBind}),
Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw2)),
?assertEqual(0, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})),
?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
%% deleted
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)),
?assertError(not_found, current_conns(<<"tcp:new">>, {{127, 0, 0, 1}, 1987})),
?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
ok.
t_delete_default_conf(_Conf) ->
Raw = emqx:get_raw_config(?LISTENERS),
%% delete default listeners
Raw1 = emqx_utils_maps:deep_put([<<"tcp">>, <<"default">>], Raw, ?TOMBSTONE_VALUE),
Raw2 = emqx_utils_maps:deep_put([<<"ssl">>, <<"default">>], Raw1, ?TOMBSTONE_VALUE),
Raw3 = emqx_utils_maps:deep_put([<<"ws">>, <<"default">>], Raw2, ?TOMBSTONE_VALUE),
Raw4 = emqx_utils_maps:deep_put([<<"wss">>, <<"default">>], Raw3, ?TOMBSTONE_VALUE),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw4)),
?assertError(not_found, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
?assertError(not_found, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
?assertMatch({error, not_found}, is_running('ws:default')),
?assertMatch({error, not_found}, is_running('wss:default')),
%% reset
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw)),
?assertEqual(0, current_conns(<<"tcp:default">>, {{0, 0, 0, 0}, 1883})),
?assertEqual(0, current_conns(<<"ssl:default">>, {{0, 0, 0, 0}, 8883})),
?assert(is_running('ws:default')),
?assert(is_running('wss:default')),
ok.

View File

@ -106,6 +106,67 @@ bad_cipher_test() ->
),
ok.
fail_if_no_peer_cert_test_() ->
Sc = #{
roots => [mqtt_ssl_listener],
fields => #{mqtt_ssl_listener => emqx_schema:fields("mqtt_ssl_listener")}
},
Opts = #{atom_key => false, required => false},
OptsAtomKey = #{atom_key => true, required => false},
InvalidConf = #{
<<"bind">> => <<"0.0.0.0:9883">>,
<<"ssl_options">> => #{
<<"fail_if_no_peer_cert">> => true,
<<"verify">> => <<"verify_none">>
}
},
InvalidListener = #{<<"mqtt_ssl_listener">> => InvalidConf},
ValidListener = #{
<<"mqtt_ssl_listener">> => InvalidConf#{
<<"ssl_options">> =>
#{
<<"fail_if_no_peer_cert">> => true,
<<"verify">> => <<"verify_peer">>
}
}
},
ValidListener1 = #{
<<"mqtt_ssl_listener">> => InvalidConf#{
<<"ssl_options">> =>
#{
<<"fail_if_no_peer_cert">> => false,
<<"verify">> => <<"verify_none">>
}
}
},
Reason = "verify must be verify_peer when fail_if_no_peer_cert is true",
[
?_assertThrow(
{_Sc, [#{kind := validation_error, reason := Reason}]},
hocon_tconf:check_plain(Sc, InvalidListener, Opts)
),
?_assertThrow(
{_Sc, [#{kind := validation_error, reason := Reason}]},
hocon_tconf:check_plain(Sc, InvalidListener, OptsAtomKey)
),
?_assertMatch(
#{mqtt_ssl_listener := #{}},
hocon_tconf:check_plain(Sc, ValidListener, OptsAtomKey)
),
?_assertMatch(
#{mqtt_ssl_listener := #{}},
hocon_tconf:check_plain(Sc, ValidListener1, OptsAtomKey)
),
?_assertMatch(
#{<<"mqtt_ssl_listener">> := #{}},
hocon_tconf:check_plain(Sc, ValidListener, Opts)
),
?_assertMatch(
#{<<"mqtt_ssl_listener">> := #{}},
hocon_tconf:check_plain(Sc, ValidListener1, Opts)
)
].
validate(Schema, Data0) ->
Sc = #{
roots => [ssl_opts],

View File

@ -99,7 +99,7 @@ choose_ingress_pool_size(
{_Filter, #{share := _Name}} ->
% NOTE: this is shared subscription, many workers may subscribe
PoolSize;
{_Filter, #{}} ->
{_Filter, #{}} when PoolSize > 1 ->
% NOTE: this is regular subscription, only one worker should subscribe
?SLOG(warning, #{
msg => "mqtt_bridge_ingress_pool_size_ignored",
@ -110,6 +110,8 @@ choose_ingress_pool_size(
config_pool_size => PoolSize,
pool_size => 1
}),
1;
{_Filter, #{}} when PoolSize == 1 ->
1
end.

View File

@ -143,7 +143,7 @@ fields("cluster") ->
)},
{"discovery_strategy",
sc(
hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
hoconsc:enum([manual, static, dns, etcd, k8s, mcast]),
#{
default => manual,
desc => ?DESC(cluster_discovery_strategy),
@ -198,7 +198,7 @@ fields("cluster") ->
{"mcast",
sc(
?R_REF(cluster_mcast),
#{}
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"dns",
sc(

View File

@ -22,8 +22,7 @@
-export([
start/2,
stop/1,
prep_stop/1
stop/1
]).
%%--------------------------------------------------------------------
@ -34,10 +33,6 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_exhook_sup:start_link(),
{ok, Sup}.
prep_stop(State) ->
emqx_ctl:unregister_command(exhook),
State.
stop(_State) ->
ok.

View File

@ -23,6 +23,9 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SERVERS, [exhook, servers]).
-define(EXHOOK, [exhook]).
%% APIs
-export([start_link/0]).
@ -148,7 +151,7 @@ update_config(KeyPath, UpdateReq) ->
Error
end.
pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
pre_config_update(?SERVERS, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of
true ->
throw(already_exists);
@ -156,48 +159,36 @@ pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
NConf = maybe_write_certs(Conf),
{ok, OldConf ++ [NConf]}
end;
pre_config_update(_, {update, Name, Conf}, OldConf) ->
case replace_conf(Name, fun(_) -> Conf end, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
end;
pre_config_update(_, {delete, ToDelete}, OldConf) ->
case do_delete(ToDelete, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {move, Name, Position}, OldConf) ->
case do_move(Name, Position, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {enable, Name, Enable}, OldConf) ->
case
replace_conf(
Name,
fun(Conf) -> Conf#{<<"enable">> => Enable} end,
OldConf
)
of
not_found -> throw(not_found);
NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
end.
pre_config_update(?SERVERS, {update, Name, Conf}, OldConf) ->
NewConf = replace_conf(Name, fun(_) -> Conf end, OldConf),
{ok, lists:map(fun maybe_write_certs/1, NewConf)};
pre_config_update(?SERVERS, {delete, ToDelete}, OldConf) ->
{ok, do_delete(ToDelete, OldConf)};
pre_config_update(?SERVERS, {move, Name, Position}, OldConf) ->
{ok, do_move(Name, Position, OldConf)};
pre_config_update(?SERVERS, {enable, Name, Enable}, OldConf) ->
ReplaceFun = fun(Conf) -> Conf#{<<"enable">> => Enable} end,
NewConf = replace_conf(Name, ReplaceFun, OldConf),
{ok, lists:map(fun maybe_write_certs/1, NewConf)};
pre_config_update(?EXHOOK, NewConf, _OldConf) when NewConf =:= #{} ->
{ok, NewConf#{<<"servers">> => []}};
pre_config_update(?EXHOOK, NewConf = #{<<"servers">> := Servers}, _OldConf) ->
{ok, NewConf#{<<"servers">> => lists:map(fun maybe_write_certs/1, Servers)}}.
post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
Result = call({update_config, UpdateReq, NewConf}),
Result = call({update_config, UpdateReq, NewConf, OldConf}),
try_clear_ssl_files(UpdateReq, NewConf, OldConf),
{ok, Result}.
%%=====================================================================
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
emqx_conf:add_handler([exhook, servers], ?MODULE),
ServerL = emqx:get_config([exhook, servers]),
emqx_conf:add_handler(?EXHOOK, ?MODULE),
emqx_conf:add_handler(?SERVERS, ?MODULE),
ServerL = emqx:get_config(?SERVERS),
Servers = load_all_servers(ServerL),
Servers2 = reorder(ServerL, Servers),
refresh_tick(),
@ -222,22 +213,16 @@ handle_call(
OrderServers = sort_name_by_order(Infos, Servers),
{reply, OrderServers, State};
handle_call(
{update_config, {move, _Name, _Position}, NewConfL},
{update_config, {move, _Name, _Position}, NewConfL, _},
_From,
#{servers := Servers} = State
) ->
Servers2 = reorder(NewConfL, Servers),
{reply, ok, State#{servers := Servers2}};
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
emqx_exhook_metrics:on_server_deleted(ToDelete),
#{servers := Servers} = State2 = do_unload_server(ToDelete, State),
Servers2 = maps:remove(ToDelete, Servers),
{reply, ok, update_servers(Servers2, State2)};
handle_call({update_config, {delete, ToDelete}, _, _}, _From, State) ->
{reply, ok, remove_server(ToDelete, State)};
handle_call(
{update_config, {add, RawConf}, NewConfL},
{update_config, {add, RawConf}, NewConfL, _},
_From,
#{servers := Servers} = State
) ->
@ -246,14 +231,30 @@ handle_call(
Servers2 = Servers#{Name => Server},
Servers3 = reorder(NewConfL, Servers2),
{reply, Result, State#{servers := Servers3}};
handle_call({update_config, {update, Name, _Conf}, NewConfL, _}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({update_config, {enable, Name, _Enable}, NewConfL, _}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({update_config, _, ConfL, ConfL}, _From, State) ->
{reply, ok, State};
handle_call({update_config, _, #{servers := NewConfL}, #{servers := OldConfL}}, _From, State) ->
#{
removed := Removed,
added := Added,
changed := Updated
} = emqx_utils:diff_lists(NewConfL, OldConfL, fun(#{name := Name}) -> Name end),
State2 = remove_servers(Removed, State),
{UpdateRes, State3} = restart_servers(Updated, NewConfL, State2),
{AddRes, State4 = #{servers := Servers4}} = add_servers(Added, State3),
State5 = State4#{servers => reorder(NewConfL, Servers4)},
case UpdateRes =:= [] andalso AddRes =:= [] of
true -> {reply, ok, State5};
false -> {reply, {error, #{added => AddRes, updated => UpdateRes}}, State5}
end;
handle_call({lookup, Name}, _From, State) ->
{reply, where_is_server(Name, State), State};
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
@ -287,6 +288,22 @@ handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
remove_servers(Removes, State) ->
lists:foldl(
fun(Conf, Acc) ->
ToDelete = maps:get(name, Conf),
remove_server(ToDelete, Acc)
end,
State,
Removes
).
remove_server(ToDelete, State) ->
emqx_exhook_metrics:on_server_deleted(ToDelete),
#{servers := Servers} = State2 = do_unload_server(ToDelete, State),
Servers2 = maps:remove(ToDelete, Servers),
update_servers(Servers2, State2).
handle_cast(_Msg, State) ->
{noreply, State}.
@ -310,6 +327,8 @@ terminate(Reason, State = #{servers := Servers}) ->
Servers
),
?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}),
emqx_conf:remove_handler(?SERVERS),
emqx_conf:remove_handler(?EXHOOK),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -325,6 +344,22 @@ unload_exhooks() ->
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
].
add_servers(Added, State) ->
lists:foldl(
fun(Conf = #{name := Name}, {ResAcc, StateAcc}) ->
case do_load_server(options_to_server(Conf)) of
{ok, Server} ->
#{servers := Servers} = StateAcc,
Servers2 = Servers#{Name => Server},
{ResAcc, update_servers(Servers2, StateAcc)};
{Err, StateAcc1} ->
{[Err | ResAcc], StateAcc1}
end
end,
{[], State},
Added
).
do_load_server(#{name := Name} = Server) ->
case emqx_exhook_server:load(Name, Server) of
{ok, ServerState} ->
@ -401,8 +436,7 @@ clean_reload_timer(#{timer := Timer}) ->
_ = erlang:cancel_timer(Timer),
ok.
-spec do_move(binary(), position(), list(server_options())) ->
not_found | list(server_options()).
-spec do_move(binary(), position(), list(server_options())) -> list(server_options()).
do_move(Name, Position, ConfL) ->
move(ConfL, Name, Position, []).
@ -411,7 +445,7 @@ move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) ->
move([Server | T], Name, Position, HeadL) ->
move(T, Name, Position, [Server | HeadL]);
move([], _Name, _Position, _HeadL) ->
not_found.
throw(not_found).
move_to(?CMD_MOVE_FRONT, Server, ServerL) ->
[Server | ServerL];
@ -429,8 +463,7 @@ move_to([H | T], Position, Server, HeadL) ->
move_to([], _Position, _Server, _HeadL) ->
not_found.
-spec do_delete(binary(), list(server_options())) ->
not_found | list(server_options()).
-spec do_delete(binary(), list(server_options())) -> list(server_options()).
do_delete(ToDelete, OldConf) ->
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of
true ->
@ -439,7 +472,7 @@ do_delete(ToDelete, OldConf) ->
OldConf
);
false ->
not_found
throw(not_found)
end.
-spec reorder(list(server_options()), servers()) -> servers().
@ -471,9 +504,7 @@ where_is_server(Name, #{servers := Servers}) ->
-type replace_fun() :: fun((server_options()) -> server_options()).
-spec replace_conf(binary(), replace_fun(), list(server_options())) ->
not_found
| list(server_options()).
-spec replace_conf(binary(), replace_fun(), list(server_options())) -> list(server_options()).
replace_conf(Name, ReplaceFun, ConfL) ->
replace_conf(ConfL, Name, ReplaceFun, []).
@ -483,7 +514,20 @@ replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) ->
replace_conf([H | T], Name, ReplaceFun, HeadL) ->
replace_conf(T, Name, ReplaceFun, [H | HeadL]);
replace_conf([], _, _, _) ->
not_found.
throw(not_found).
restart_servers(Updated, NewConfL, State) ->
lists:foldl(
fun({_Old, Conf}, {ResAcc, StateAcc}) ->
Name = maps:get(name, Conf),
case restart_server(Name, NewConfL, StateAcc) of
{ok, StateAcc1} -> {ResAcc, StateAcc1};
{Err, StateAcc1} -> {[Err | ResAcc], StateAcc1}
end
end,
{[], State},
Updated
).
-spec restart_server(binary(), list(server_options()), state()) ->
{ok, state()}
@ -612,6 +656,16 @@ try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when
NewSSL = find_server_ssl_cfg(Name, NewConfs),
OldSSL = find_server_ssl_cfg(Name, OldConfs),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL);
%% replace the whole config from the cli
try_clear_ssl_files(_Req, #{servers := NewServers}, #{servers := OldServers}) ->
lists:foreach(
fun(#{name := Name} = Conf) ->
NewSSL = find_server_ssl_cfg(Name, NewServers),
OldSSL = maps:get(ssl, Conf, undefined),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL)
end,
OldServers
);
try_clear_ssl_files(_Req, _NewConf, _OldConf) ->
ok.

View File

@ -196,9 +196,9 @@ t_error_update_conf(_) ->
Path = [exhook, servers],
Name = <<"error_update">>,
ErrorCfg = #{<<"name">> => Name},
{error, _} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}),
{error, _} = emqx_exhook_mgr:update_config(Path, {move, Name, top, <<>>}),
{error, _} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}),
{error, not_found} = emqx_exhook_mgr:update_config(Path, {update, Name, ErrorCfg}),
{error, not_found} = emqx_exhook_mgr:update_config(Path, {move, Name, top}),
{error, not_found} = emqx_exhook_mgr:update_config(Path, {enable, Name, true}),
ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>},
{ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}),
@ -210,12 +210,37 @@ t_error_update_conf(_) ->
},
{ok, _} = emqx_exhook_mgr:update_config(Path, {update, Name, DisableAnd}),
{ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}),
{error, not_found} = emqx_exhook_mgr:update_config(
Path, {delete, <<"delete_not_exists">>}
),
{ok, _} = emqx_exhook_mgr:update_config(Path, {delete, Name}),
{error, not_found} = emqx_exhook_mgr:update_config(Path, {delete, Name}),
ok.
t_update_conf(_Config) ->
Path = [exhook],
Conf = #{<<"servers">> := Servers} = emqx_config:get_raw(Path),
?assert(length(Servers) > 1),
Servers1 = shuffle(Servers),
ReOrderedConf = Conf#{<<"servers">> => Servers1},
validate_servers(Path, ReOrderedConf, Servers1),
[_ | Servers2] = Servers,
DeletedConf = Conf#{<<"servers">> => Servers2},
validate_servers(Path, DeletedConf, Servers2),
[L1, L2 | Servers3] = Servers,
UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000},
UpdatedServers = [L1, UpdateL2 | Servers3],
UpdatedConf = Conf#{<<"servers">> => UpdatedServers},
validate_servers(Path, UpdatedConf, UpdatedServers),
%% reset
validate_servers(Path, Conf, Servers),
ok.
validate_servers(Path, ReOrderConf, Servers1) ->
{ok, _} = emqx_exhook_mgr:update_config(Path, ReOrderConf),
?assertEqual(ReOrderConf, emqx_config:get_raw(Path)),
List = emqx_exhook_mgr:list(),
ExpectL = lists:map(fun(#{<<"name">> := Name}) -> Name end, Servers1),
L1 = lists:map(fun(#{name := Name}) -> Name end, List),
?assertEqual(ExpectL, L1).
t_error_server_info(_) ->
not_found = emqx_exhook_mgr:server_info(<<"not_exists">>),
ok.
@ -490,6 +515,10 @@ data_file(Name) ->
cert_file(Name) ->
data_file(filename:join(["certs", Name])).
%% FIXME: this creats inter-test dependency
%% FIXME: this creates inter-test dependency
stop_apps(Apps) ->
emqx_common_test_helpers:stop_apps(Apps, #{erase_all_configs => false}).
shuffle(List) ->
Sorted = lists:sort(lists:map(fun(L) -> {rand:uniform(), L} end, List)),
lists:map(fun({_, L}) -> L end, Sorted).

View File

@ -29,6 +29,8 @@
-export([fold/4]).
-export([mk_temp_filename/1]).
-type foldfun(Acc) ::
fun(
(
@ -178,3 +180,24 @@ fold(FoldFun, Acc, It) ->
none ->
Acc
end.
-spec mk_temp_filename(file:filename()) ->
file:filename().
mk_temp_filename(Filename) ->
% NOTE
% Using only the first 200 characters of the filename to avoid making filenames
% exceeding 255 bytes in UTF-8. It's actually too conservative, `Suffix` can be
% at most 16 bytes.
Unique = erlang:unique_integer([positive]),
Suffix = binary:encode_hex(<<Unique:64>>),
mk_filename([string:slice(Filename, 0, 200), ".", Suffix]).
mk_filename(Comps) ->
lists:append(lists:map(fun mk_filename_component/1, Comps)).
mk_filename_component(A) when is_atom(A) ->
atom_to_list(A);
mk_filename_component(B) when is_binary(B) ->
unicode:characters_to_list(B);
mk_filename_component(S) when is_list(S) ->
S.

View File

@ -42,7 +42,9 @@
%% on most filesystems. Even though, say, S3 does not have such limitations, it's
%% still useful to have a limit on the filename length, to avoid having to deal with
%% limits in the storage backends.
-define(MAX_FILENAME_BYTELEN, 255).
%% Usual realistic limit is 255 bytes actually, but we leave some room for backends
%% to spare.
-define(MAX_FILENAME_BYTELEN, 240).
-import(hoconsc, [ref/2, mk/2]).
@ -145,7 +147,7 @@ fields(local_storage_segments) ->
[
{root,
mk(
binary(),
string(),
#{
desc => ?DESC("local_storage_segments_root"),
required => false
@ -182,7 +184,7 @@ fields(local_storage_exporter) ->
[
{root,
mk(
binary(),
string(),
#{
desc => ?DESC("local_storage_exporter_root"),
required => false

View File

@ -128,7 +128,17 @@ complete(
Filemeta = FilemetaIn#{checksum => Checksum},
ok = file:close(Handle),
_ = filelib:ensure_dir(ResultFilepath),
_ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)),
ManifestFilepath = mk_manifest_filename(ResultFilepath),
case file:write_file(ManifestFilepath, encode_filemeta(Filemeta)) of
ok ->
ok;
{error, Reason} ->
?SLOG(warning, "filemeta_write_failed", #{
path => ManifestFilepath,
meta => Filemeta,
reason => Reason
})
end,
file:rename(Filepath, ResultFilepath).
-spec discard(export_st()) ->
@ -452,8 +462,7 @@ mk_manifest_filename(Filename) when is_binary(Filename) ->
<<Filename/binary, ?MANIFEST>>.
mk_temp_absfilepath(Options, Transfer, Filename) ->
Unique = erlang:unique_integer([positive]),
TempFilename = integer_to_list(Unique) ++ "." ++ Filename,
TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
filename:join(mk_absdir(Options, Transfer, temporary), TempFilename).
mk_absdir(Options, _Transfer, temporary) ->

View File

@ -445,16 +445,8 @@ write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content)
end.
mk_temp_filepath(Storage, Transfer, Filename) ->
Unique = erlang:unique_integer([positive]),
filename:join(get_subdir(Storage, Transfer, temporary), mk_filename([Unique, ".", Filename])).
mk_filename(Comps) ->
lists:append(lists:map(fun mk_filename_component/1, Comps)).
mk_filename_component(I) when is_integer(I) -> integer_to_list(I);
mk_filename_component(A) when is_atom(A) -> atom_to_list(A);
mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B);
mk_filename_component(S) when is_list(S) -> S.
TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
filename:join(get_subdir(Storage, Transfer, temporary), TempFilename).
write_contents(Filepath, Content) ->
file:write_file(Filepath, Content).

View File

@ -261,6 +261,7 @@ t_nasty_clientids_fileids(_Config) ->
fun({ClientId, FileId}) ->
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId),
[Export] = list_files(ClientId),
?assertMatch(#{meta := #{name := "justfile"}}, Export),
?assertEqual({ok, ClientId}, read_export(Export))
end,
Transfers
@ -270,13 +271,15 @@ t_nasty_filenames(_Config) ->
Filenames = [
{<<"nasty1">>, "146%"},
{<<"nasty2">>, "🌚"},
{<<"nasty3">>, "中文.txt"}
{<<"nasty3">>, "中文.txt"},
{<<"nasty4">>, _239Bytes = string:join(lists:duplicate(240 div 5, "LONG"), ".")}
],
ok = lists:foreach(
fun({ClientId, Filename}) ->
FileId = unicode:characters_to_binary(Filename),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
[Export] = list_files(ClientId),
?assertMatch(#{meta := #{name := Filename}}, Export),
?assertEqual({ok, FileId}, read_export(Export))
end,
Filenames

View File

@ -87,7 +87,7 @@ t_update_config(_Config) ->
)
),
?assertEqual(
<<"/tmp/path">>,
"/tmp/path",
emqx_config:get([file_transfer, storage, local, segments, root])
),
?assertEqual(
@ -150,7 +150,7 @@ t_disable_restore_config(Config) ->
),
ok = emqtt:stop(Client),
% Restore local storage backend
Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
Root = emqx_ft_test_helpers:root(Config, node(), [segments]),
?assertMatch(
{ok, _},
emqx_conf:update(
@ -177,7 +177,7 @@ t_disable_restore_config(Config) ->
[
#{
?snk_kind := garbage_collection,
storage := #{segments := #{root := Root}}
storage := #{segments := #{gc := #{interval := 1000}}}
}
],
?of_kind(garbage_collection, Trace)

View File

@ -63,3 +63,24 @@ unescape_filename_test_() ->
?_assertEqual(Input, emqx_ft_fs_util:unescape_filename(Filename))
|| {Filename, Input} <- ?NAMES
].
mk_temp_filename_test_() ->
[
?_assertMatch(
"." ++ Suffix when length(Suffix) == 16,
emqx_ft_fs_util:mk_temp_filename(<<>>)
),
?_assertMatch(
"file.name." ++ Suffix when length(Suffix) == 16,
emqx_ft_fs_util:mk_temp_filename("file.name")
),
?_assertMatch(
"safe.🦺." ++ Suffix when length(Suffix) == 16,
emqx_ft_fs_util:mk_temp_filename(<<"safe.🦺"/utf8>>)
),
?_assertEqual(
% FilenameSlice + Dot + Suffix
200 + 1 + 16,
length(emqx_ft_fs_util:mk_temp_filename(lists:duplicate(63, "LONG")))
)
].

View File

@ -16,7 +16,7 @@
-module(emqx_gateway).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
%% Gateway APIs
-export([

View File

@ -74,18 +74,20 @@
-type listener_ref() :: {ListenerType :: atom_or_bin(), ListenerName :: atom_or_bin()}.
-define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)).
-define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-define(GATEWAY, [gateway]).
-spec load() -> ok.
load() ->
emqx_conf:add_handler([gateway], ?MODULE).
emqx_conf:add_handler(?GATEWAY, ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([gateway]).
emqx_conf:remove_handler(?GATEWAY).
%%--------------------------------------------------------------------
%% APIs
@ -104,7 +106,7 @@ unconvert_listeners(Ls) when is_list(Ls) ->
lists:foldl(
fun(Lis, Acc) ->
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
_ = vaildate_listener_name(Name),
_ = validate_listener_name(Name),
NLis1 = maps:without([<<"id">>, <<"running">>], Lis1),
emqx_utils_maps:deep_merge(Acc, #{Type => #{Name => NLis1}})
end,
@ -122,7 +124,7 @@ maps_key_take([K | Ks], M, Acc) ->
{V, M1} -> maps_key_take(Ks, M1, [V | Acc])
end.
vaildate_listener_name(Name) ->
validate_listener_name(Name) ->
try
{match, _} = re:run(Name, "^[0-9a-zA-Z_-]+$"),
ok
@ -373,7 +375,7 @@ ret_listener_or_err(_, _, Err) ->
emqx_config:raw_config()
) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
pre_config_update(?GATEWAY, {load_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
@ -381,29 +383,25 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
_ ->
badres_gateway(already_exist, GwName)
end;
pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
pre_config_update(?GATEWAY, {update_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
badres_gateway(not_found, GwName);
GwRawConf ->
Conf1 = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf),
Conf1 = maps:without(?IGNORE_KEYS, Conf),
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf1),
NConf1 = maps:merge(GwRawConf, NConf),
{ok, emqx_utils_maps:deep_put([GwName], RawConf, NConf1)}
end;
pre_config_update(_, {unload_gateway, GwName}, RawConf) ->
pre_config_update(?GATEWAY, {unload_gateway, GwName}, RawConf) ->
_ = tune_gw_certs(
fun clear_certs/2,
GwName,
maps:get(GwName, RawConf, #{})
),
{ok, maps:remove(GwName, RawConf)};
pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined
)
of
pre_config_update(?GATEWAY, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case get_listener(GwName, LType, LName, RawConf) of
undefined ->
NConf = convert_certs(certs_dir(GwName), Conf),
NListener = #{LType => #{LName => NConf}},
@ -415,12 +413,8 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
_ ->
badres_listener(already_exist, GwName, LType, LName)
end;
pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined
)
of
pre_config_update(?GATEWAY, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case get_listener(GwName, LType, LName, RawConf) of
undefined ->
badres_listener(not_found, GwName, LType, LName);
OldConf ->
@ -432,21 +426,17 @@ pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) -
),
{ok, NRawConf}
end;
pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName],
case emqx_utils_maps:deep_get(Path, RawConf, undefined) of
pre_config_update(?GATEWAY, {remove_listener, GwName, {LType, LName}}, RawConf) ->
case get_listener(GwName, LType, LName, RawConf) of
undefined ->
{ok, RawConf};
OldConf ->
clear_certs(certs_dir(GwName), OldConf),
Path = [GwName, <<"listeners">>, LType, LName],
{ok, emqx_utils_maps:deep_remove(Path, RawConf)}
end;
pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, ?AUTHN_BIN], RawConf, undefined
)
of
pre_config_update(?GATEWAY, {add_authn, GwName, Conf}, RawConf) ->
case get_authn(GwName, RawConf) of
undefined ->
CertsDir = authn_certs_dir(GwName, Conf),
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf),
@ -458,14 +448,8 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
_ ->
badres_authn(already_exist, GwName)
end;
pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf,
undefined
)
of
pre_config_update(?GATEWAY, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case get_listener(GwName, LType, LName, RawConf) of
undefined ->
badres_listener(not_found, GwName, LType, LName);
Listener ->
@ -486,12 +470,8 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
badres_listener_authn(already_exist, GwName, LType, LName)
end
end;
pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, ?AUTHN_BIN], RawConf, undefined
)
of
pre_config_update(?GATEWAY, {update_authn, GwName, Conf}, RawConf) ->
case get_authn(GwName, RawConf) of
undefined ->
badres_authn(not_found, GwName);
OldAuthnConf ->
@ -499,14 +479,8 @@ pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
Conf1 = emqx_authentication_config:convert_certs(CertsDir, Conf, OldAuthnConf),
{ok, emqx_utils_maps:deep_put([GwName, ?AUTHN_BIN], RawConf, Conf1)}
end;
pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf,
undefined
)
of
pre_config_update(?GATEWAY, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case get_listener(GwName, LType, LName, RawConf) of
undefined ->
badres_listener(not_found, GwName, LType, LName);
Listener ->
@ -533,12 +507,8 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
)}
end
end;
pre_config_update(_, {remove_authn, GwName}, RawConf) ->
case
emqx_utils_maps:deep_get(
[GwName, ?AUTHN_BIN], RawConf, undefined
)
of
pre_config_update(?GATEWAY, {remove_authn, GwName}, RawConf) ->
case get_authn(GwName, RawConf) of
undefined ->
ok;
OldAuthnConf ->
@ -549,7 +519,7 @@ pre_config_update(_, {remove_authn, GwName}, RawConf) ->
emqx_utils_maps:deep_remove(
[GwName, ?AUTHN_BIN], RawConf
)};
pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
pre_config_update(?GATEWAY, {remove_authn, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName, ?AUTHN_BIN],
case
emqx_utils_maps:deep_get(
@ -565,10 +535,184 @@ pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
emqx_authentication_config:clear_certs(CertsDir, OldAuthnConf)
end,
{ok, emqx_utils_maps:deep_remove(Path, RawConf)};
pre_config_update(_, UnknownReq, _RawConf) ->
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
pre_config_update(?GATEWAY, NewRawConf0 = #{}, OldRawConf = #{}) ->
%% FIXME don't support gateway's listener's authn update.
%% load all authentications
NewRawConf1 = pre_load_authentications(NewRawConf0, OldRawConf),
%% load all listeners
NewRawConf2 = pre_load_listeners(NewRawConf1, OldRawConf),
%% load all gateway
NewRawConf3 = pre_load_gateways(NewRawConf2, OldRawConf),
{ok, NewRawConf3};
pre_config_update(Path, UnknownReq, _RawConf) ->
?SLOG(error, #{
msg => "unknown_gateway_update_request",
request => UnknownReq,
path => Path
}),
{error, badreq}.
pre_load_gateways(NewConf, OldConf) ->
%% unload old gateways
maps:foreach(
fun(GwName, _OldGwConf) ->
case maps:find(GwName, NewConf) of
error -> pre_config_update(?GATEWAY, {unload_gateway, GwName}, OldConf);
_ -> ok
end
end,
OldConf
),
%% load/update gateways
maps:map(
fun(GwName, NewGwConf) ->
case maps:find(GwName, OldConf) of
{ok, NewGwConf} ->
NewGwConf;
{ok, _OldGwConf} ->
{ok, #{GwName := NewGwConf1}} = pre_config_update(
?GATEWAY, {update_gateway, GwName, NewGwConf}, OldConf
),
%% update gateway should pass through ignore keys(listener/authn)
PassThroughConf = maps:with(?IGNORE_KEYS, NewGwConf),
NewGwConf2 = maps:without(?IGNORE_KEYS, NewGwConf1),
maps:merge(NewGwConf2, PassThroughConf);
error ->
{ok, #{GwName := NewGwConf1}} = pre_config_update(
?GATEWAY, {load_gateway, GwName, NewGwConf}, OldConf
),
NewGwConf1
end
end,
NewConf
).
pre_load_listeners(NewConf, OldConf) ->
%% remove listeners
maps:foreach(
fun(GwName, GwConf) ->
Listeners = maps:get(<<"listeners">>, GwConf, #{}),
remove_listeners(GwName, NewConf, OldConf, Listeners)
end,
OldConf
),
%% add/update listeners
maps:map(
fun(GwName, GwConf) ->
Listeners = maps:get(<<"listeners">>, GwConf, #{}),
NewListeners = create_or_update_listeners(GwName, OldConf, Listeners),
maps:put(<<"listeners">>, NewListeners, GwConf)
end,
NewConf
).
create_or_update_listeners(GwName, OldConf, Listeners) ->
maps:map(
fun(LType, LConf) ->
maps:map(
fun(LName, LConf1) ->
NConf =
case get_listener(GwName, LType, LName, OldConf) of
undefined ->
{ok, NConf0} =
pre_config_update(
?GATEWAY,
{add_listener, GwName, {LType, LName}, LConf1},
OldConf
),
NConf0;
_ ->
{ok, NConf0} =
pre_config_update(
?GATEWAY,
{update_listener, GwName, {LType, LName}, LConf1},
OldConf
),
NConf0
end,
get_listener(GwName, LType, LName, NConf)
end,
LConf
)
end,
Listeners
).
remove_listeners(GwName, NewConf, OldConf, Listeners) ->
maps:foreach(
fun(LType, LConf) ->
maps:foreach(
fun(LName, _LConf1) ->
case get_listener(GwName, LType, LName, NewConf) of
undefined ->
pre_config_update(
?GATEWAY, {remove_listener, GwName, {LType, LName}}, OldConf
);
_ ->
ok
end
end,
LConf
)
end,
Listeners
).
get_listener(GwName, LType, LName, NewConf) ->
emqx_utils_maps:deep_get(
[GwName, <<"listeners">>, LType, LName], NewConf, undefined
).
get_authn(GwName, Conf) ->
emqx_utils_maps:deep_get([GwName, ?AUTHN_BIN], Conf, undefined).
pre_load_authentications(NewConf, OldConf) ->
%% remove authentications when not in new config
maps:foreach(
fun(GwName, OldGwConf) ->
case
maps:get(?AUTHN_BIN, OldGwConf, undefined) =/= undefined andalso
get_authn(GwName, NewConf) =:= undefined
of
true ->
pre_config_update(?GATEWAY, {remove_authn, GwName}, OldConf);
false ->
ok
end
end,
OldConf
),
%% add/update authentications
maps:map(
fun(GwName, NewGwConf) ->
case get_authn(GwName, OldConf) of
undefined ->
case maps:get(?AUTHN_BIN, NewGwConf, undefined) of
undefined ->
NewGwConf;
AuthN ->
{ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} =
pre_config_update(?GATEWAY, {add_authn, GwName, AuthN}, OldConf),
maps:put(?AUTHN_BIN, NAuthN, NewGwConf)
end;
OldAuthN ->
case maps:get(?AUTHN_BIN, NewGwConf, undefined) of
undefined ->
NewGwConf;
OldAuthN ->
NewGwConf;
NewAuthN ->
{ok, #{GwName := #{?AUTHN_BIN := NAuthN}}} =
pre_config_update(
?GATEWAY, {update_authn, GwName, NewAuthN}, OldConf
),
maps:put(?AUTHN_BIN, NAuthN, NewGwConf)
end
end
end,
NewConf
).
badres_gateway(not_found, GwName) ->
{error,
{badres, #{
@ -642,7 +786,7 @@ badres_listener_authn(already_exist, GwName, LType, LName) ->
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
post_config_update(?GATEWAY, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
[_Tag, GwName0 | _] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0),
@ -657,11 +801,35 @@ post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
{New, Old} when is_map(New), is_map(Old) ->
emqx_gateway:update(GwName, New)
end;
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) ->
%% unload gateways
maps:foreach(
fun(GwName, _OldGwConf) ->
case maps:get(GwName, NewConfig, undefined) of
undefined ->
emqx_gateway:unload(GwName);
_ ->
ok
end
end,
OldConfig
),
%% load/update gateways
maps:foreach(
fun(GwName, NewGwConf) ->
case maps:get(GwName, OldConfig, undefined) of
undefined ->
emqx_gateway:load(GwName, NewGwConf);
_ ->
emqx_gateway:update(GwName, NewGwConf)
end
end,
NewConfig
),
ok.
%%--------------------------------------------------------------------
%% Internal funcs
%% Internal functions
%%--------------------------------------------------------------------
tune_gw_certs(Fun, GwName, Conf) ->

View File

@ -18,7 +18,7 @@
-behaviour(gen_server).
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
%% APIs
-export([start_link/1]).

View File

@ -120,7 +120,10 @@ fields(ssl_listener) ->
{ssl_options,
sc(
hoconsc:ref(emqx_schema, "listener_ssl_opts"),
#{desc => ?DESC(ssl_listener_options)}
#{
desc => ?DESC(ssl_listener_options),
validator => fun emqx_schema:validate_server_ssl_opts/1
}
)}
];
fields(udp_listener) ->
@ -132,7 +135,13 @@ fields(udp_listener) ->
fields(dtls_listener) ->
[{acceptors, sc(integer(), #{default => 16, desc => ?DESC(dtls_listener_acceptors)})}] ++
fields(udp_listener) ++
[{dtls_options, sc(ref(dtls_opts), #{desc => ?DESC(dtls_listener_dtls_opts)})}];
[
{dtls_options,
sc(ref(dtls_opts), #{
desc => ?DESC(dtls_listener_dtls_opts),
validator => fun emqx_schema:validate_server_ssl_opts/1
})}
];
fields(udp_opts) ->
[
{active_n,

View File

@ -277,6 +277,48 @@ t_load_unload_gateway(_) ->
{config_not_found, [<<"gateway">>, stomp]},
emqx:get_raw_config([gateway, stomp])
),
%% test update([gateway], Conf)
Raw0 = emqx:get_raw_config([gateway]),
#{<<"listeners">> := StompConfL1} = StompConf1,
StompConf11 = StompConf1#{
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL1)
},
#{<<"listeners">> := StompConfL2} = StompConf2,
StompConf22 = StompConf2#{
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL2)
},
Raw1 = Raw0#{<<"stomp">> => StompConf11},
Raw2 = Raw0#{<<"stomp">> => StompConf22},
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
?assertMatch(
#{
config := #{
authentication := #{backend := built_in_database, enable := true},
listeners := #{tcp := #{default := #{bind := 61613}}},
mountpoint := <<"t/">>,
idle_timeout := 10000
}
},
emqx_gateway:lookup('stomp')
),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
assert_confs(StompConf2, emqx:get_raw_config([gateway, stomp])),
?assertMatch(
#{
config :=
#{
authentication := #{backend := built_in_database, enable := true},
listeners := #{tcp := #{default := #{bind := 61613}}},
idle_timeout := 20000,
mountpoint := <<"t2/">>
}
},
emqx_gateway:lookup('stomp')
),
%% reset
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
?assertEqual(undefined, emqx_gateway:lookup('stomp')),
ok.
t_load_remove_authn(_) ->
@ -310,6 +352,40 @@ t_load_remove_authn(_) ->
{config_not_found, [<<"gateway">>, stomp, authentication]},
emqx:get_raw_config([gateway, stomp, authentication])
),
%% test update([gateway], Conf)
Raw0 = emqx:get_raw_config([gateway]),
#{<<"listeners">> := StompConfL} = StompConf,
StompConf1 = StompConf#{
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL),
<<"authentication">> => ?CONF_STOMP_AUTHN_1
},
Raw1 = maps:put(<<"stomp">>, StompConf1, Raw0),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
?assertMatch(
#{
stomp :=
#{
authn := <<"password_based:built_in_database">>,
listeners := [#{authn := <<"undefined">>, type := tcp}],
num_clients := 0
}
},
emqx_gateway:get_basic_usage_info()
),
%% reset(remove authn)
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
?assertMatch(
#{
stomp :=
#{
authn := <<"undefined">>,
listeners := [#{authn := <<"undefined">>, type := tcp}],
num_clients := 0
}
},
emqx_gateway:get_basic_usage_info()
),
ok.
t_load_remove_listeners(_) ->
@ -324,6 +400,7 @@ t_load_remove_listeners(_) ->
{<<"tcp">>, <<"default">>},
?CONF_STOMP_LISTENER_1
),
assert_confs(
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
emqx:get_raw_config([gateway, stomp])
@ -355,6 +432,59 @@ t_load_remove_listeners(_) ->
{config_not_found, [<<"gateway">>, stomp, listeners, tcp, default]},
emqx:get_raw_config([gateway, stomp, listeners, tcp, default])
),
%% test update([gateway], Conf)
Raw0 = emqx:get_raw_config([gateway]),
Raw1 = emqx_utils_maps:deep_put(
[<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_1
),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
assert_confs(
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
emqx:get_raw_config([gateway, stomp])
),
?assertMatch(
#{
stomp :=
#{
authn := <<"password_based:built_in_database">>,
listeners := [#{authn := <<"undefined">>, type := tcp}],
num_clients := 0
}
},
emqx_gateway:get_basic_usage_info()
),
Raw2 = emqx_utils_maps:deep_put(
[<<"stomp">>, <<"listeners">>, <<"tcp">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_2
),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
assert_confs(
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_2)),
emqx:get_raw_config([gateway, stomp])
),
?assertMatch(
#{
stomp :=
#{
authn := <<"password_based:built_in_database">>,
listeners := [#{authn := <<"undefined">>, type := tcp}],
num_clients := 0
}
},
emqx_gateway:get_basic_usage_info()
),
%% reset(remove listener)
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
?assertMatch(
#{
stomp :=
#{
authn := <<"password_based:built_in_database">>,
listeners := [],
num_clients := 0
}
},
emqx_gateway:get_basic_usage_info()
),
ok.
t_load_remove_listener_authn(_) ->
@ -417,6 +547,7 @@ t_load_gateway_with_certs_content(_) ->
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
emqx:get_raw_config([gateway, stomp])
),
assert_ssl_confs_files_exist(SslConf),
ok = emqx_gateway_conf:unload_gateway(<<"stomp">>),
assert_ssl_confs_files_deleted(SslConf),
?assertException(
@ -424,6 +555,25 @@ t_load_gateway_with_certs_content(_) ->
{config_not_found, [<<"gateway">>, stomp]},
emqx:get_raw_config([gateway, stomp])
),
%% test update([gateway], Conf)
Raw0 = emqx:get_raw_config([gateway]),
#{<<"listeners">> := StompConfL} = StompConf,
StompConf1 = StompConf#{
<<"listeners">> => emqx_gateway_conf:unconvert_listeners(StompConfL)
},
Raw1 = emqx_utils_maps:deep_put([<<"stomp">>], Raw0, StompConf1),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
assert_ssl_confs_files_exist(SslConf),
?assertEqual(
SslConf,
emqx_utils_maps:deep_get(
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
emqx:get_raw_config([gateway, stomp])
)
),
%% reset
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
assert_ssl_confs_files_deleted(SslConf),
ok.
%% TODO: Comment out this test case for now, because emqx_tls_lib
@ -475,6 +625,7 @@ t_add_listener_with_certs_content(_) ->
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
emqx:get_raw_config([gateway, stomp])
),
assert_ssl_confs_files_exist(SslConf),
ok = emqx_gateway_conf:remove_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}
),
@ -492,6 +643,34 @@ t_add_listener_with_certs_content(_) ->
{config_not_found, [<<"gateway">>, stomp, listeners, ssl, default]},
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
),
%% test update([gateway], Conf)
Raw0 = emqx:get_raw_config([gateway]),
Raw1 = emqx_utils_maps:deep_put(
[<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL
),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw1)),
SslConf1 = emqx_utils_maps:deep_get(
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
emqx:get_raw_config([gateway, stomp])
),
assert_ssl_confs_files_exist(SslConf1),
%% update
Raw2 = emqx_utils_maps:deep_put(
[<<"stomp">>, <<"listeners">>, <<"ssl">>, <<"default">>], Raw0, ?CONF_STOMP_LISTENER_SSL_2
),
?assertMatch({ok, _}, emqx:update_config([gateway], Raw2)),
SslConf2 =
emqx_utils_maps:deep_get(
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl_options">>],
emqx:get_raw_config([gateway, stomp])
),
assert_ssl_confs_files_exist(SslConf2),
%% reset
?assertMatch({ok, _}, emqx:update_config([gateway], Raw0)),
assert_ssl_confs_files_deleted(SslConf),
assert_ssl_confs_files_deleted(SslConf1),
assert_ssl_confs_files_deleted(SslConf2),
ok.
assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
@ -503,6 +682,15 @@ assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
end,
Ks
).
assert_ssl_confs_files_exist(SslConf) when is_map(SslConf) ->
Ks = [<<"cacertfile">>, <<"certfile">>, <<"keyfile">>],
lists:foreach(
fun(K) ->
Path = maps:get(K, SslConf),
{ok, _} = file:read_file(Path)
end,
Ks
).
%%--------------------------------------------------------------------
%% Utils

View File

@ -83,13 +83,13 @@ do_assert_confs(Key, Expected, Effected) ->
maybe_unconvert_listeners(Conf) when is_map(Conf) ->
case maps:take(<<"listeners">>, Conf) of
error ->
Conf;
{Ls, Conf1} ->
{Ls, Conf1} when is_list(Ls) ->
Conf1#{
<<"listeners">> =>
emqx_gateway_conf:unconvert_listeners(Ls)
}
};
_ ->
Conf
end;
maybe_unconvert_listeners(Conf) ->
Conf.

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},

View File

@ -1111,15 +1111,16 @@ check_pub_authz(
convert_pub_to_msg(
{TopicName, Flags, Data},
Channel = #channel{clientinfo = #{clientid := ClientId}}
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
Message = put_message_headers(
emqx_message:make(
ClientId,
NewQoS,
TopicName,
NTopicName,
Data,
#{dup => Dup, retain => Retain},
#{}

View File

@ -120,6 +120,13 @@ restart_mqttsn_with_subs_resume_off() ->
Conf#{<<"subs_resume">> => <<"false">>}
).
restart_mqttsn_with_mountpoint(Mp) ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"mountpoint">> => Mp}
).
default_config() ->
?CONF_DEFAULT.
@ -990,6 +997,44 @@ t_publish_qos2_case03(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_publish_mountpoint(_) ->
restart_mqttsn_with_mountpoint(<<"mp/">>),
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
Topic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(
<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
?assertEqual(
<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)
),
timer:sleep(100),
?assertEqual(
<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
restart_mqttsn_with_mountpoint(<<>>),
gen_udp:close(Socket).
t_delivery_qos1_register_invalid_topic_id(_) ->
Dup = 0,
QoS = 1,

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.0.2"},
{vsn, "5.0.3"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -54,7 +54,8 @@
safe_to_existing_atom/1,
safe_to_existing_atom/2,
pub_props_to_packet/1,
safe_filename/1
safe_filename/1,
diff_lists/3
]).
-export([
@ -748,3 +749,152 @@ safe_filename(Filename) when is_binary(Filename) ->
binary:replace(Filename, <<":">>, <<"-">>, [global]);
safe_filename(Filename) when is_list(Filename) ->
lists:flatten(string:replace(Filename, ":", "-", all)).
%% @doc Compares two lists of maps and returns the differences between them in a
%% map containing four keys 'removed', 'added', 'identical', and 'changed'
%% each holding a list of maps. Elements are compared using key function KeyFunc
%% to extract the comparison key used for matching.
%%
%% The return value is a map with the following keys and the list of maps as its values:
%% * 'removed' a list of maps that were present in the Old list, but not found in the New list.
%% * 'added' a list of maps that were present in the New list, but not found in the Old list.
%% * 'identical' a list of maps that were present in both lists and have the same comparison key value.
%% * 'changed' a list of pairs of maps representing the changes between maps present in the New and Old lists.
%% The first map in the pair represents the map in the Old list, and the second map
%% represents the potential modification in the New list.
%% The KeyFunc parameter is a function that extracts the comparison key used
%% for matching from each map. The function should return a comparable term,
%% such as an atom, a number, or a string. This is used to determine if each
%% element is the same in both lists.
-spec diff_lists(list(T), list(T), Func) ->
#{
added := list(T),
identical := list(T),
removed := list(T),
changed := list({Old :: T, New :: T})
}
when
Func :: fun((T) -> any()),
T :: any().
diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
Removed =
lists:foldl(
fun(E, RemovedAcc) ->
case search(KeyFunc(E), KeyFunc, New) of
false -> [E | RemovedAcc];
_ -> RemovedAcc
end
end,
[],
Old
),
{Added, Identical, Changed} =
lists:foldl(
fun(E, Acc) ->
{Added0, Identical0, Changed0} = Acc,
case search(KeyFunc(E), KeyFunc, Old) of
false ->
{[E | Added0], Identical0, Changed0};
E ->
{Added0, [E | Identical0], Changed0};
E1 ->
{Added0, Identical0, [{E1, E} | Changed0]}
end
end,
{[], [], []},
New
),
#{
removed => lists:reverse(Removed),
added => lists:reverse(Added),
identical => lists:reverse(Identical),
changed => lists:reverse(Changed)
}.
search(_ExpectValue, _KeyFunc, []) ->
false;
search(ExpectValue, KeyFunc, [Item | List]) ->
case KeyFunc(Item) =:= ExpectValue of
true -> Item;
false -> search(ExpectValue, KeyFunc, List)
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff_lists_test() ->
KeyFunc = fun(#{name := Name}) -> Name end,
?assertEqual(
#{
removed => [],
added => [],
identical => [],
changed => []
},
diff_lists([], [], KeyFunc)
),
%% test removed list
?assertEqual(
#{
removed => [#{name => a, value => 1}],
added => [],
identical => [],
changed => []
},
diff_lists([], [#{name => a, value => 1}], KeyFunc)
),
%% test added list
?assertEqual(
#{
removed => [],
added => [#{name => a, value => 1}],
identical => [],
changed => []
},
diff_lists([#{name => a, value => 1}], [], KeyFunc)
),
%% test identical list
?assertEqual(
#{
removed => [],
added => [],
identical => [#{name => a, value => 1}],
changed => []
},
diff_lists([#{name => a, value => 1}], [#{name => a, value => 1}], KeyFunc)
),
Old = [
#{name => a, value => 1},
#{name => b, value => 4},
#{name => e, value => 2},
#{name => d, value => 4}
],
New = [
#{name => a, value => 1},
#{name => b, value => 2},
#{name => e, value => 2},
#{name => c, value => 3}
],
Diff = diff_lists(New, Old, KeyFunc),
?assertEqual(
#{
added => [
#{name => c, value => 3}
],
identical => [
#{name => a, value => 1},
#{name => e, value => 2}
],
removed => [
#{name => d, value => 4}
],
changed => [{#{name => b, value => 4}, #{name => b, value => 2}}]
},
Diff
),
ok.
-endif.

View File

@ -0,0 +1,5 @@
Deprecated UDP mcast mechanism for cluster discovery.
This feature has been planed for deprecation since 5.0 mainly due to the lack of
actual production use.
This feature code is not yet removed in 5.1, but the document interface is demoted.

View File

@ -0,0 +1 @@
Fix the issue in MQTT-SN gateway where the `mountpoint` does not take effect on message publishing.

View File

@ -0,0 +1,8 @@
Disallow enabling `fail_if_no_peer_cert` in listener SSL options if `verify_none` is set.
Setting `fail_if_no_peer_cert = true` and `verify = verify_none` caused connection errors
due to incompatible options.
This fix validates the options when creating or updating a listener to avoid these errors.
Note: any old listener configuration with `fail_if_no_peer_cert = true` and `verify = verify_none`
that was previously allowed will fail to load after applying this fix and must be manually fixed.

View File

@ -755,7 +755,9 @@ cluster_discovery_strategy.desc:
- static: Configure static nodes list by setting <code>seeds</code> in config file.<br/>
- dns: Use DNS A record to discover peer nodes.<br/>
- etcd: Use etcd to discover peer nodes.<br/>
- k8s: Use Kubernetes API to discover peer pods."""
- k8s: Use Kubernetes API to discover peer pods.
- mcast: Deprecated since 5.1, will be removed in 5.2.
This supports discovery via UDP multicast."""
cluster_discovery_strategy.label:
"""Cluster Discovery Strategy"""