Merge branch 'master' into mqttsn-qos3
This commit is contained in:
commit
81387800b5
|
@ -43,8 +43,18 @@ runs:
|
||||||
echo "OTP_SOURCE_PATH=$OTP_SOURCE_PATH" >> $GITHUB_OUTPUT
|
echo "OTP_SOURCE_PATH=$OTP_SOURCE_PATH" >> $GITHUB_OUTPUT
|
||||||
echo "OTP_INSTALL_PATH=$OTP_INSTALL_PATH" >> $GITHUB_OUTPUT
|
echo "OTP_INSTALL_PATH=$OTP_INSTALL_PATH" >> $GITHUB_OUTPUT
|
||||||
mkdir -p "$OTP_SOURCE_PATH" "$OTP_INSTALL_PATH"
|
mkdir -p "$OTP_SOURCE_PATH" "$OTP_INSTALL_PATH"
|
||||||
|
# we need this to skip using cache for self-hosted runners
|
||||||
|
case ${{ inputs.os }} in
|
||||||
|
*arm64)
|
||||||
|
echo "SELF_HOSTED=true" >> $GITHUB_OUTPUT
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "SELF_HOSTED=false" >> $GITHUB_OUTPUT
|
||||||
|
;;
|
||||||
|
esac
|
||||||
- uses: actions/cache@v3
|
- uses: actions/cache@v3
|
||||||
id: cache
|
id: cache
|
||||||
|
if: steps.prepare.outputs.SELF_HOSTED != 'true'
|
||||||
with:
|
with:
|
||||||
path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }}
|
path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }}
|
||||||
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
|
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
|
||||||
|
@ -54,22 +64,36 @@ runs:
|
||||||
run: |
|
run: |
|
||||||
OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}"
|
OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}"
|
||||||
OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}"
|
OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}"
|
||||||
|
SELF_HOSTED="${{ steps.prepare.outputs.SELF_HOSTED }}"
|
||||||
|
# when it's self-hosted, it never hits the cache,
|
||||||
|
# skip rebuild if it's self-hosted and the install path already has a 'bin'
|
||||||
|
if [ "${SELF_HOSTED:-false}" = 'true' ]; then
|
||||||
|
if [ -n "$OTP_INSTALL_PATH" ] && [ -d "$OTP_INSTALL_PATH/bin" ]; then
|
||||||
|
echo "Skip rebuilding OTP, found $OTP_INSTALL_PATH"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
## when it's not self-hosted, or the install path is not found,
|
||||||
|
## build otp from source code.
|
||||||
if [ -d "$OTP_SOURCE_PATH" ]; then
|
if [ -d "$OTP_SOURCE_PATH" ]; then
|
||||||
rm -rf "$OTP_SOURCE_PATH"
|
rm -rf "$OTP_SOURCE_PATH"
|
||||||
fi
|
fi
|
||||||
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
|
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
|
||||||
cd "$OTP_SOURCE_PATH"
|
cd "$OTP_SOURCE_PATH"
|
||||||
if [ "$(arch)" = arm64 ]; then
|
if [ "$(arch)" = arm64 ]; then
|
||||||
|
export CFLAGS="-O2 -g -I$(brew --prefix unixodbc)/include"
|
||||||
export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
|
export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
|
||||||
export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
|
WITH_ODBC="--with-odbc=$(brew --prefix unixodbc)"
|
||||||
|
else
|
||||||
|
WITH_ODBC=""
|
||||||
fi
|
fi
|
||||||
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
|
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) ${WITH_ODBC} --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
|
||||||
make -j$(nproc)
|
make -j$(nproc)
|
||||||
rm -rf "$OTP_INSTALL_PATH"
|
rm -rf "$OTP_INSTALL_PATH"
|
||||||
make install
|
make install
|
||||||
if [ "$(arch)" = arm64 ]; then
|
if [ "$(arch)" = arm64 ]; then
|
||||||
|
unset CFLAGS
|
||||||
unset LDFLAGS
|
unset LDFLAGS
|
||||||
unset CC
|
|
||||||
fi
|
fi
|
||||||
- name: build
|
- name: build
|
||||||
env:
|
env:
|
||||||
|
@ -87,6 +111,10 @@ runs:
|
||||||
shell: bash
|
shell: bash
|
||||||
run: |
|
run: |
|
||||||
export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH"
|
export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH"
|
||||||
|
# inspec erl in PATH
|
||||||
|
which erl
|
||||||
|
# inspec erl command banner
|
||||||
|
erl -s init stop
|
||||||
make ensure-rebar3
|
make ensure-rebar3
|
||||||
mkdir -p $HOME/bin
|
mkdir -p $HOME/bin
|
||||||
cp rebar3 $HOME/bin/rebar3
|
cp rebar3 $HOME/bin/rebar3
|
||||||
|
|
|
@ -32,10 +32,10 @@
|
||||||
%% `apps/emqx/src/bpapi/README.md'
|
%% `apps/emqx/src/bpapi/README.md'
|
||||||
|
|
||||||
%% Community edition
|
%% Community edition
|
||||||
-define(EMQX_RELEASE_CE, "5.1.0-alpha.2").
|
-define(EMQX_RELEASE_CE, "5.1.0-alpha.3").
|
||||||
|
|
||||||
%% Enterprise edition
|
%% Enterprise edition
|
||||||
-define(EMQX_RELEASE_EE, "5.1.0-alpha.2").
|
-define(EMQX_RELEASE_EE, "5.1.0-alpha.3").
|
||||||
|
|
||||||
%% the HTTP API version
|
%% the HTTP API version
|
||||||
-define(EMQX_API_VERSION, "5.0").
|
-define(EMQX_API_VERSION, "5.0").
|
||||||
|
|
|
@ -94,7 +94,8 @@
|
||||||
validate_keepalive_multiplier/1,
|
validate_keepalive_multiplier/1,
|
||||||
non_empty_string/1,
|
non_empty_string/1,
|
||||||
validations/0,
|
validations/0,
|
||||||
naive_env_interpolation/1
|
naive_env_interpolation/1,
|
||||||
|
validate_server_ssl_opts/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([qos/0]).
|
-export([qos/0]).
|
||||||
|
@ -958,7 +959,7 @@ fields("mqtt_wss_listener") ->
|
||||||
{"ssl_options",
|
{"ssl_options",
|
||||||
sc(
|
sc(
|
||||||
ref("listener_wss_opts"),
|
ref("listener_wss_opts"),
|
||||||
#{}
|
#{validator => fun validate_server_ssl_opts/1}
|
||||||
)},
|
)},
|
||||||
{"websocket",
|
{"websocket",
|
||||||
sc(
|
sc(
|
||||||
|
@ -2426,8 +2427,21 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
|
||||||
]
|
]
|
||||||
].
|
].
|
||||||
|
|
||||||
|
validate_server_ssl_opts(#{<<"fail_if_no_peer_cert">> := true, <<"verify">> := Verify}) ->
|
||||||
|
validate_verify(Verify);
|
||||||
|
validate_server_ssl_opts(#{fail_if_no_peer_cert := true, verify := Verify}) ->
|
||||||
|
validate_verify(Verify);
|
||||||
|
validate_server_ssl_opts(_SSLOpts) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
validate_verify(verify_peer) ->
|
||||||
|
ok;
|
||||||
|
validate_verify(_) ->
|
||||||
|
{error, "verify must be verify_peer when fail_if_no_peer_cert is true"}.
|
||||||
|
|
||||||
mqtt_ssl_listener_ssl_options_validator(Conf) ->
|
mqtt_ssl_listener_ssl_options_validator(Conf) ->
|
||||||
Checks = [
|
Checks = [
|
||||||
|
fun validate_server_ssl_opts/1,
|
||||||
fun ocsp_outer_validator/1,
|
fun ocsp_outer_validator/1,
|
||||||
fun crl_outer_validator/1
|
fun crl_outer_validator/1
|
||||||
],
|
],
|
||||||
|
|
|
@ -106,6 +106,67 @@ bad_cipher_test() ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
fail_if_no_peer_cert_test_() ->
|
||||||
|
Sc = #{
|
||||||
|
roots => [mqtt_ssl_listener],
|
||||||
|
fields => #{mqtt_ssl_listener => emqx_schema:fields("mqtt_ssl_listener")}
|
||||||
|
},
|
||||||
|
Opts = #{atom_key => false, required => false},
|
||||||
|
OptsAtomKey = #{atom_key => true, required => false},
|
||||||
|
InvalidConf = #{
|
||||||
|
<<"bind">> => <<"0.0.0.0:9883">>,
|
||||||
|
<<"ssl_options">> => #{
|
||||||
|
<<"fail_if_no_peer_cert">> => true,
|
||||||
|
<<"verify">> => <<"verify_none">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
InvalidListener = #{<<"mqtt_ssl_listener">> => InvalidConf},
|
||||||
|
ValidListener = #{
|
||||||
|
<<"mqtt_ssl_listener">> => InvalidConf#{
|
||||||
|
<<"ssl_options">> =>
|
||||||
|
#{
|
||||||
|
<<"fail_if_no_peer_cert">> => true,
|
||||||
|
<<"verify">> => <<"verify_peer">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ValidListener1 = #{
|
||||||
|
<<"mqtt_ssl_listener">> => InvalidConf#{
|
||||||
|
<<"ssl_options">> =>
|
||||||
|
#{
|
||||||
|
<<"fail_if_no_peer_cert">> => false,
|
||||||
|
<<"verify">> => <<"verify_none">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Reason = "verify must be verify_peer when fail_if_no_peer_cert is true",
|
||||||
|
[
|
||||||
|
?_assertThrow(
|
||||||
|
{_Sc, [#{kind := validation_error, reason := Reason}]},
|
||||||
|
hocon_tconf:check_plain(Sc, InvalidListener, Opts)
|
||||||
|
),
|
||||||
|
?_assertThrow(
|
||||||
|
{_Sc, [#{kind := validation_error, reason := Reason}]},
|
||||||
|
hocon_tconf:check_plain(Sc, InvalidListener, OptsAtomKey)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
#{mqtt_ssl_listener := #{}},
|
||||||
|
hocon_tconf:check_plain(Sc, ValidListener, OptsAtomKey)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
#{mqtt_ssl_listener := #{}},
|
||||||
|
hocon_tconf:check_plain(Sc, ValidListener1, OptsAtomKey)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
#{<<"mqtt_ssl_listener">> := #{}},
|
||||||
|
hocon_tconf:check_plain(Sc, ValidListener, Opts)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
#{<<"mqtt_ssl_listener">> := #{}},
|
||||||
|
hocon_tconf:check_plain(Sc, ValidListener1, Opts)
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
validate(Schema, Data0) ->
|
validate(Schema, Data0) ->
|
||||||
Sc = #{
|
Sc = #{
|
||||||
roots => [ssl_opts],
|
roots => [ssl_opts],
|
||||||
|
|
|
@ -99,7 +99,7 @@ choose_ingress_pool_size(
|
||||||
{_Filter, #{share := _Name}} ->
|
{_Filter, #{share := _Name}} ->
|
||||||
% NOTE: this is shared subscription, many workers may subscribe
|
% NOTE: this is shared subscription, many workers may subscribe
|
||||||
PoolSize;
|
PoolSize;
|
||||||
{_Filter, #{}} ->
|
{_Filter, #{}} when PoolSize > 1 ->
|
||||||
% NOTE: this is regular subscription, only one worker should subscribe
|
% NOTE: this is regular subscription, only one worker should subscribe
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "mqtt_bridge_ingress_pool_size_ignored",
|
msg => "mqtt_bridge_ingress_pool_size_ignored",
|
||||||
|
@ -110,6 +110,8 @@ choose_ingress_pool_size(
|
||||||
config_pool_size => PoolSize,
|
config_pool_size => PoolSize,
|
||||||
pool_size => 1
|
pool_size => 1
|
||||||
}),
|
}),
|
||||||
|
1;
|
||||||
|
{_Filter, #{}} when PoolSize == 1 ->
|
||||||
1
|
1
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,8 @@
|
||||||
|
|
||||||
-export([fold/4]).
|
-export([fold/4]).
|
||||||
|
|
||||||
|
-export([mk_temp_filename/1]).
|
||||||
|
|
||||||
-type foldfun(Acc) ::
|
-type foldfun(Acc) ::
|
||||||
fun(
|
fun(
|
||||||
(
|
(
|
||||||
|
@ -178,3 +180,24 @@ fold(FoldFun, Acc, It) ->
|
||||||
none ->
|
none ->
|
||||||
Acc
|
Acc
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec mk_temp_filename(file:filename()) ->
|
||||||
|
file:filename().
|
||||||
|
mk_temp_filename(Filename) ->
|
||||||
|
% NOTE
|
||||||
|
% Using only the first 200 characters of the filename to avoid making filenames
|
||||||
|
% exceeding 255 bytes in UTF-8. It's actually too conservative, `Suffix` can be
|
||||||
|
% at most 16 bytes.
|
||||||
|
Unique = erlang:unique_integer([positive]),
|
||||||
|
Suffix = binary:encode_hex(<<Unique:64>>),
|
||||||
|
mk_filename([string:slice(Filename, 0, 200), ".", Suffix]).
|
||||||
|
|
||||||
|
mk_filename(Comps) ->
|
||||||
|
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
||||||
|
|
||||||
|
mk_filename_component(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
mk_filename_component(B) when is_binary(B) ->
|
||||||
|
unicode:characters_to_list(B);
|
||||||
|
mk_filename_component(S) when is_list(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -42,7 +42,9 @@
|
||||||
%% on most filesystems. Even though, say, S3 does not have such limitations, it's
|
%% on most filesystems. Even though, say, S3 does not have such limitations, it's
|
||||||
%% still useful to have a limit on the filename length, to avoid having to deal with
|
%% still useful to have a limit on the filename length, to avoid having to deal with
|
||||||
%% limits in the storage backends.
|
%% limits in the storage backends.
|
||||||
-define(MAX_FILENAME_BYTELEN, 255).
|
%% Usual realistic limit is 255 bytes actually, but we leave some room for backends
|
||||||
|
%% to spare.
|
||||||
|
-define(MAX_FILENAME_BYTELEN, 240).
|
||||||
|
|
||||||
-import(hoconsc, [ref/2, mk/2]).
|
-import(hoconsc, [ref/2, mk/2]).
|
||||||
|
|
||||||
|
@ -145,7 +147,7 @@ fields(local_storage_segments) ->
|
||||||
[
|
[
|
||||||
{root,
|
{root,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
string(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("local_storage_segments_root"),
|
desc => ?DESC("local_storage_segments_root"),
|
||||||
required => false
|
required => false
|
||||||
|
@ -182,7 +184,7 @@ fields(local_storage_exporter) ->
|
||||||
[
|
[
|
||||||
{root,
|
{root,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
string(),
|
||||||
#{
|
#{
|
||||||
desc => ?DESC("local_storage_exporter_root"),
|
desc => ?DESC("local_storage_exporter_root"),
|
||||||
required => false
|
required => false
|
||||||
|
|
|
@ -128,7 +128,17 @@ complete(
|
||||||
Filemeta = FilemetaIn#{checksum => Checksum},
|
Filemeta = FilemetaIn#{checksum => Checksum},
|
||||||
ok = file:close(Handle),
|
ok = file:close(Handle),
|
||||||
_ = filelib:ensure_dir(ResultFilepath),
|
_ = filelib:ensure_dir(ResultFilepath),
|
||||||
_ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)),
|
ManifestFilepath = mk_manifest_filename(ResultFilepath),
|
||||||
|
case file:write_file(ManifestFilepath, encode_filemeta(Filemeta)) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(warning, "filemeta_write_failed", #{
|
||||||
|
path => ManifestFilepath,
|
||||||
|
meta => Filemeta,
|
||||||
|
reason => Reason
|
||||||
|
})
|
||||||
|
end,
|
||||||
file:rename(Filepath, ResultFilepath).
|
file:rename(Filepath, ResultFilepath).
|
||||||
|
|
||||||
-spec discard(export_st()) ->
|
-spec discard(export_st()) ->
|
||||||
|
@ -452,8 +462,7 @@ mk_manifest_filename(Filename) when is_binary(Filename) ->
|
||||||
<<Filename/binary, ?MANIFEST>>.
|
<<Filename/binary, ?MANIFEST>>.
|
||||||
|
|
||||||
mk_temp_absfilepath(Options, Transfer, Filename) ->
|
mk_temp_absfilepath(Options, Transfer, Filename) ->
|
||||||
Unique = erlang:unique_integer([positive]),
|
TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
|
||||||
TempFilename = integer_to_list(Unique) ++ "." ++ Filename,
|
|
||||||
filename:join(mk_absdir(Options, Transfer, temporary), TempFilename).
|
filename:join(mk_absdir(Options, Transfer, temporary), TempFilename).
|
||||||
|
|
||||||
mk_absdir(Options, _Transfer, temporary) ->
|
mk_absdir(Options, _Transfer, temporary) ->
|
||||||
|
|
|
@ -445,16 +445,8 @@ write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_temp_filepath(Storage, Transfer, Filename) ->
|
mk_temp_filepath(Storage, Transfer, Filename) ->
|
||||||
Unique = erlang:unique_integer([positive]),
|
TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
|
||||||
filename:join(get_subdir(Storage, Transfer, temporary), mk_filename([Unique, ".", Filename])).
|
filename:join(get_subdir(Storage, Transfer, temporary), TempFilename).
|
||||||
|
|
||||||
mk_filename(Comps) ->
|
|
||||||
lists:append(lists:map(fun mk_filename_component/1, Comps)).
|
|
||||||
|
|
||||||
mk_filename_component(I) when is_integer(I) -> integer_to_list(I);
|
|
||||||
mk_filename_component(A) when is_atom(A) -> atom_to_list(A);
|
|
||||||
mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B);
|
|
||||||
mk_filename_component(S) when is_list(S) -> S.
|
|
||||||
|
|
||||||
write_contents(Filepath, Content) ->
|
write_contents(Filepath, Content) ->
|
||||||
file:write_file(Filepath, Content).
|
file:write_file(Filepath, Content).
|
||||||
|
|
|
@ -261,6 +261,7 @@ t_nasty_clientids_fileids(_Config) ->
|
||||||
fun({ClientId, FileId}) ->
|
fun({ClientId, FileId}) ->
|
||||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId),
|
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId),
|
||||||
[Export] = list_files(ClientId),
|
[Export] = list_files(ClientId),
|
||||||
|
?assertMatch(#{meta := #{name := "justfile"}}, Export),
|
||||||
?assertEqual({ok, ClientId}, read_export(Export))
|
?assertEqual({ok, ClientId}, read_export(Export))
|
||||||
end,
|
end,
|
||||||
Transfers
|
Transfers
|
||||||
|
@ -270,13 +271,15 @@ t_nasty_filenames(_Config) ->
|
||||||
Filenames = [
|
Filenames = [
|
||||||
{<<"nasty1">>, "146%"},
|
{<<"nasty1">>, "146%"},
|
||||||
{<<"nasty2">>, "🌚"},
|
{<<"nasty2">>, "🌚"},
|
||||||
{<<"nasty3">>, "中文.txt"}
|
{<<"nasty3">>, "中文.txt"},
|
||||||
|
{<<"nasty4">>, _239Bytes = string:join(lists:duplicate(240 div 5, "LONG"), ".")}
|
||||||
],
|
],
|
||||||
ok = lists:foreach(
|
ok = lists:foreach(
|
||||||
fun({ClientId, Filename}) ->
|
fun({ClientId, Filename}) ->
|
||||||
FileId = unicode:characters_to_binary(Filename),
|
FileId = unicode:characters_to_binary(Filename),
|
||||||
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
|
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId),
|
||||||
[Export] = list_files(ClientId),
|
[Export] = list_files(ClientId),
|
||||||
|
?assertMatch(#{meta := #{name := Filename}}, Export),
|
||||||
?assertEqual({ok, FileId}, read_export(Export))
|
?assertEqual({ok, FileId}, read_export(Export))
|
||||||
end,
|
end,
|
||||||
Filenames
|
Filenames
|
||||||
|
|
|
@ -87,7 +87,7 @@ t_update_config(_Config) ->
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
<<"/tmp/path">>,
|
"/tmp/path",
|
||||||
emqx_config:get([file_transfer, storage, local, segments, root])
|
emqx_config:get([file_transfer, storage, local, segments, root])
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -150,7 +150,7 @@ t_disable_restore_config(Config) ->
|
||||||
),
|
),
|
||||||
ok = emqtt:stop(Client),
|
ok = emqtt:stop(Client),
|
||||||
% Restore local storage backend
|
% Restore local storage backend
|
||||||
Root = iolist_to_binary(emqx_ft_test_helpers:root(Config, node(), [segments])),
|
Root = emqx_ft_test_helpers:root(Config, node(), [segments]),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
|
@ -177,7 +177,7 @@ t_disable_restore_config(Config) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
?snk_kind := garbage_collection,
|
?snk_kind := garbage_collection,
|
||||||
storage := #{segments := #{root := Root}}
|
storage := #{segments := #{gc := #{interval := 1000}}}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
?of_kind(garbage_collection, Trace)
|
?of_kind(garbage_collection, Trace)
|
||||||
|
|
|
@ -63,3 +63,24 @@ unescape_filename_test_() ->
|
||||||
?_assertEqual(Input, emqx_ft_fs_util:unescape_filename(Filename))
|
?_assertEqual(Input, emqx_ft_fs_util:unescape_filename(Filename))
|
||||||
|| {Filename, Input} <- ?NAMES
|
|| {Filename, Input} <- ?NAMES
|
||||||
].
|
].
|
||||||
|
|
||||||
|
mk_temp_filename_test_() ->
|
||||||
|
[
|
||||||
|
?_assertMatch(
|
||||||
|
"." ++ Suffix when length(Suffix) == 16,
|
||||||
|
emqx_ft_fs_util:mk_temp_filename(<<>>)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
"file.name." ++ Suffix when length(Suffix) == 16,
|
||||||
|
emqx_ft_fs_util:mk_temp_filename("file.name")
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
"safe.🦺." ++ Suffix when length(Suffix) == 16,
|
||||||
|
emqx_ft_fs_util:mk_temp_filename(<<"safe.🦺"/utf8>>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
% FilenameSlice + Dot + Suffix
|
||||||
|
200 + 1 + 16,
|
||||||
|
length(emqx_ft_fs_util:mk_temp_filename(lists:duplicate(63, "LONG")))
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
|
@ -120,7 +120,10 @@ fields(ssl_listener) ->
|
||||||
{ssl_options,
|
{ssl_options,
|
||||||
sc(
|
sc(
|
||||||
hoconsc:ref(emqx_schema, "listener_ssl_opts"),
|
hoconsc:ref(emqx_schema, "listener_ssl_opts"),
|
||||||
#{desc => ?DESC(ssl_listener_options)}
|
#{
|
||||||
|
desc => ?DESC(ssl_listener_options),
|
||||||
|
validator => fun emqx_schema:validate_server_ssl_opts/1
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields(udp_listener) ->
|
fields(udp_listener) ->
|
||||||
|
@ -132,7 +135,13 @@ fields(udp_listener) ->
|
||||||
fields(dtls_listener) ->
|
fields(dtls_listener) ->
|
||||||
[{acceptors, sc(integer(), #{default => 16, desc => ?DESC(dtls_listener_acceptors)})}] ++
|
[{acceptors, sc(integer(), #{default => 16, desc => ?DESC(dtls_listener_acceptors)})}] ++
|
||||||
fields(udp_listener) ++
|
fields(udp_listener) ++
|
||||||
[{dtls_options, sc(ref(dtls_opts), #{desc => ?DESC(dtls_listener_dtls_opts)})}];
|
[
|
||||||
|
{dtls_options,
|
||||||
|
sc(ref(dtls_opts), #{
|
||||||
|
desc => ?DESC(dtls_listener_dtls_opts),
|
||||||
|
validator => fun emqx_schema:validate_server_ssl_opts/1
|
||||||
|
})}
|
||||||
|
];
|
||||||
fields(udp_opts) ->
|
fields(udp_opts) ->
|
||||||
[
|
[
|
||||||
{active_n,
|
{active_n,
|
||||||
|
|
|
@ -1123,15 +1123,16 @@ check_pub_authz(
|
||||||
|
|
||||||
convert_pub_to_msg(
|
convert_pub_to_msg(
|
||||||
{TopicName, Flags, Data},
|
{TopicName, Flags, Data},
|
||||||
Channel = #channel{clientinfo = #{clientid := ClientId}}
|
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
|
||||||
) ->
|
) ->
|
||||||
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
|
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
|
||||||
NewQoS = get_corrected_qos(QoS),
|
NewQoS = get_corrected_qos(QoS),
|
||||||
|
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
|
||||||
Message = put_message_headers(
|
Message = put_message_headers(
|
||||||
emqx_message:make(
|
emqx_message:make(
|
||||||
ClientId,
|
ClientId,
|
||||||
NewQoS,
|
NewQoS,
|
||||||
TopicName,
|
NTopicName,
|
||||||
Data,
|
Data,
|
||||||
#{dup => Dup, retain => Retain},
|
#{dup => Dup, retain => Retain},
|
||||||
#{}
|
#{}
|
||||||
|
|
|
@ -134,6 +134,12 @@ restart_mqttsn_with_neg_qos_off() ->
|
||||||
emqx_gateway_conf:update_gateway(
|
emqx_gateway_conf:update_gateway(
|
||||||
mqttsn,
|
mqttsn,
|
||||||
Conf#{<<"enable_qos3">> => <<"false">>}
|
Conf#{<<"enable_qos3">> => <<"false">>}
|
||||||
|
|
||||||
|
restart_mqttsn_with_mountpoint(Mp) ->
|
||||||
|
Conf = emqx:get_raw_config([gateway, mqttsn]),
|
||||||
|
emqx_gateway_conf:update_gateway(
|
||||||
|
mqttsn,
|
||||||
|
Conf#{<<"mountpoint">> => Mp}
|
||||||
).
|
).
|
||||||
|
|
||||||
default_config() ->
|
default_config() ->
|
||||||
|
@ -1030,6 +1036,44 @@ t_publish_qos2_case03(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_publish_mountpoint(_) ->
|
||||||
|
restart_mqttsn_with_mountpoint(<<"mp/">>),
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 1,
|
||||||
|
Retain = 0,
|
||||||
|
Will = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
MsgId = 1,
|
||||||
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
||||||
|
Topic = <<"abc">>,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||||
|
?assertEqual(
|
||||||
|
<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)
|
||||||
|
),
|
||||||
|
|
||||||
|
Payload1 = <<20, 21, 22, 23>>,
|
||||||
|
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
||||||
|
?assertEqual(
|
||||||
|
<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)
|
||||||
|
),
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>,
|
||||||
|
receive_response(Socket)
|
||||||
|
),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
restart_mqttsn_with_mountpoint(<<>>),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_delivery_qos1_register_invalid_topic_id(_) ->
|
t_delivery_qos1_register_invalid_topic_id(_) ->
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix the issue in MQTT-SN gateway where the `mountpoint` does not take effect on message publishing.
|
|
@ -0,0 +1,8 @@
|
||||||
|
Disallow enabling `fail_if_no_peer_cert` in listener SSL options if `verify_none` is set.
|
||||||
|
|
||||||
|
Setting `fail_if_no_peer_cert = true` and `verify = verify_none` caused connection errors
|
||||||
|
due to incompatible options.
|
||||||
|
This fix validates the options when creating or updating a listener to avoid these errors.
|
||||||
|
|
||||||
|
Note: any old listener configuration with `fail_if_no_peer_cert = true` and `verify = verify_none`
|
||||||
|
that was previously allowed will fail to load after applying this fix and must be manually fixed.
|
Loading…
Reference in New Issue