Merge branch 'release-v50' into 0720-feat-direct-deny-when-anonymous-is-not-allowed
This commit is contained in:
commit
6837af2308
|
@ -23,6 +23,7 @@
|
||||||
deep_force_put/3,
|
deep_force_put/3,
|
||||||
deep_remove/2,
|
deep_remove/2,
|
||||||
deep_merge/2,
|
deep_merge/2,
|
||||||
|
binary_key_map/1,
|
||||||
safe_atom_key_map/1,
|
safe_atom_key_map/1,
|
||||||
unsafe_atom_key_map/1,
|
unsafe_atom_key_map/1,
|
||||||
jsonable_map/1,
|
jsonable_map/1,
|
||||||
|
@ -153,6 +154,17 @@ deep_convert(Val, _, _Args) ->
|
||||||
unsafe_atom_key_map(Map) ->
|
unsafe_atom_key_map(Map) ->
|
||||||
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
|
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
|
||||||
|
|
||||||
|
-spec binary_key_map(map()) -> map().
|
||||||
|
binary_key_map(Map) ->
|
||||||
|
deep_convert(
|
||||||
|
Map,
|
||||||
|
fun
|
||||||
|
(K, V) when is_atom(K) -> {atom_to_binary(K, utf8), V};
|
||||||
|
(K, V) when is_binary(K) -> {K, V}
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
).
|
||||||
|
|
||||||
-spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
-spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
||||||
safe_atom_key_map(Map) ->
|
safe_atom_key_map(Map) ->
|
||||||
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
|
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
|
||||||
|
|
|
@ -58,7 +58,8 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
|
||||||
|
|
||||||
post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
|
post_config_update(Path, '$remove', _, OldConf, _AppEnvs) ->
|
||||||
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
|
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf);
|
||||||
post_config_update(_Path, _Req, _, _OldConf, _AppEnvs) ->
|
post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) ->
|
||||||
|
_ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
-----BEGIN CERTIFICATE-----
|
||||||
|
MIIE5DCCAswCCQCF3o0gIdaNDjANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQKDAlF
|
||||||
|
TVFYIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMTEy
|
||||||
|
MzAwODQxMTFaFw00OTA1MTcwODQxMTFaMDQxEjAQBgNVBAoMCUVNUVggVGVzdDEe
|
||||||
|
MBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF
|
||||||
|
AAOCAg8AMIICCgKCAgEAqmqSrxyH16j63QhqGLT1UO8I+m6BM3HfnJQM8laQdtJ0
|
||||||
|
WgHqCh0/OphH3S7v4SfF4fNJDEJWMWuuzJzU9cTqHPLzhvo3+ZHcMIENgtY2p2Cf
|
||||||
|
7AQjEqFViEDyv2ZWNEe76BJeShntdY5NZr4gIPar99YGG/Ln8YekspleV+DU38rE
|
||||||
|
EX9WzhgBr02NN9z4NzIxeB+jdvPnxcXs3WpUxzfnUjOQf/T1tManvSdRbFmKMbxl
|
||||||
|
A8NLYK3oAYm8EbljWUINUNN6loqYhbigKv8bvo5S4xvRqmX86XB7sc0SApngtNcg
|
||||||
|
O0EKn8z/KVPDskE+8lMfGMiU2e2Tzw6Rph57mQPOPtIp5hPiKRik7ST9n0p6piXW
|
||||||
|
zRLplJEzSjf40I1u+VHmpXlWI/Fs8b1UkDSMiMVJf0LyWb4ziBSZOY2LtZzWHbWj
|
||||||
|
LbNgxQcwSS29tKgUwfEFmFcm+iOM59cPfkl2IgqVLh5h4zmKJJbfQKSaYb5fcKRf
|
||||||
|
50b1qsN40VbR3Pk/0lJ0/WqgF6kZCExmT1qzD5HJES/5grjjKA4zIxmHOVU86xOF
|
||||||
|
ouWvtilVR4PGkzmkFvwK5yRhBUoGH/A9BurhqOc0QCGay1kqHQFA6se4JJS+9KOS
|
||||||
|
x8Rn1Nm6Pi7sd6Le3cKmHTlyl5a/ofKqTCX2Qh+v/7y62V1V1wnoh3ipRjdPTnMC
|
||||||
|
AwEAATANBgkqhkiG9w0BAQsFAAOCAgEARCqaocvlMFUQjtFtepO2vyG1krn11xJ0
|
||||||
|
e7md26i+g8SxCCYqQ9IqGmQBg0Im8fyNDKRN/LZoj5+A4U4XkG1yya91ZIrPpWyF
|
||||||
|
KUiRAItchNj3g1kHmI2ckl1N//6Kpx3DPaS7qXZaN3LTExf6Ph+StE1FnS0wVF+s
|
||||||
|
tsNIf6EaQ+ZewW3pjdlLeAws3jvWKUkROc408Ngvx74zbbKo/zAC4tz8oH9ZcpsT
|
||||||
|
WD8enVVEeUQKI6ItcpZ9HgTI9TFWgfZ1vYwvkoRwNIeabYI62JKmLEo2vGfGwWKr
|
||||||
|
c+GjnJ/tlVI2DpPljfWOnQ037/7yyJI/zo65+HPRmGRD6MuW/BdPDYOvOZUTcQKh
|
||||||
|
kANi5THSbJJgZcG3jb1NLebaUQ1H0zgVjn0g3KhUV+NJQYk8RQ7rHtB+MySqTKlM
|
||||||
|
kRkRjfTfR0Ykxpks7Mjvsb6NcZENf08ZFPd45+e/ptsxpiKu4e4W4bV7NZDvNKf9
|
||||||
|
0/aD3oGYNMiP7s+KJ1lRSAjnBuG21Yk8FpzG+yr8wvJhV8aFgNQ5wIH86SuUTmN0
|
||||||
|
5bVzFEIcUejIwvGoQEctNHBlOwHrb7zmB6OwyZeMapdXBQ+9UDhYg8ehDqdDOdfn
|
||||||
|
wsBcnjD2MwNhlE1hjL+tZWLNwSHiD6xx3LvNoXZu2HK8Cp3SOrkE69cFghYMIZZb
|
||||||
|
T+fp6tNL6LE=
|
||||||
|
-----END CERTIFICATE-----
|
|
@ -0,0 +1,24 @@
|
||||||
|
-----BEGIN CERTIFICATE-----
|
||||||
|
MIID/jCCAeagAwIBAgIJAKTICmq1Lg6dMA0GCSqGSIb3DQEBCwUAMDQxEjAQBgNV
|
||||||
|
BAoMCUVNUVggVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4X
|
||||||
|
DTIxMTIzMDA4NDExMloXDTQ5MDUxNzA4NDExMlowJTESMBAGA1UECgwJRU1RWCBU
|
||||||
|
ZXN0MQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
|
||||||
|
AoIBAQDzrujfx6XZTH0MWqLO6kNAeHndUZ+OGaURXvxKMPMF5dA40lxNG6cEzzlq
|
||||||
|
0Rm61adlv8tF4kRJrs6EnRjEVoMImrdh07vGFdOTYqP01LjiBhErAzyRtSn2X8FT
|
||||||
|
Te8ExoCRs3x61SPebGY2hOvFxuO6YDPVOSDvbbxvRgqIlM1ZXC8dOvPSSGZ+P8hV
|
||||||
|
56EPayRthfu1FVptnkW9CyZCRI0gg95Hv8RC7bGG+tuWpkN9ZrRvohhgGR1+bDUi
|
||||||
|
BNBpncEsSh+UgWaj8KRN8D16H6m/Im6ty467j0at49FvPx5nACL48/ghtYvzgKLc
|
||||||
|
uKHtokKUuuzebDK/hQxN3mUSAJStAgMBAAGjIjAgMAsGA1UdDwQEAwIFoDARBglg
|
||||||
|
hkgBhvhCAQEEBAMCB4AwDQYJKoZIhvcNAQELBQADggIBAIlVyPhOpkz3MNzQmjX7
|
||||||
|
xgJ3vGPK5uK11n/wfjRwe2qXwZbrI2sYLVtTpUgvLDuP0gB73Vwfu7xAMdue6TRm
|
||||||
|
CKr9z0lkQsVBtgoqzZCjd4PYLfHm4EhsOMi98OGKU5uOGD4g3yLwQWXHhbYtiZMO
|
||||||
|
Jsj0hebYveYJt/BYTd1syGQcIcYCyVExWvSWjidfpAqjT6EF7whdubaFtuF2kaGF
|
||||||
|
IO9yn9rWtXB5yK99uCguEmKhx3fAQxomzqweTu3WRvy9axsUH3WAUW9a4DIBSz2+
|
||||||
|
ZSJNheFn5GktgggygJUGYqpSZHooUJW0UBs/8vX6AP+8MtINmqOGZUawmNwLWLOq
|
||||||
|
wHyVt2YGD5TXjzzsWNSQ4mqXxM6AXniZVZK0yYNjA4ATikX1AtwunyWBR4IjyE/D
|
||||||
|
FxYPORdZCOtywRFE1R5KLTUq/C8BNGCkYnoO78DJBO+pT0oagkQGQb0CnmC6C1db
|
||||||
|
4lWzA9K0i4B0PyooZA+gp+5FFgaLuX1DkyeaY1J204QhHR1z/Vcyl5dpqR9hqnYP
|
||||||
|
t8raLk9ogMDKqKA9iG0wc3CBNckD4sjVWAEeovXhElG55fD21wwhF+AnDCvX8iVK
|
||||||
|
cBfKV6z6uxfKjGIxc2I643I5DiIn+V3DnPxYyY74Ln1lWFYmt5JREhAxPu42zq74
|
||||||
|
e6+eIMYFszB+5gKgt6pa6ZNI
|
||||||
|
-----END CERTIFICATE-----
|
|
@ -0,0 +1,27 @@
|
||||||
|
-----BEGIN RSA PRIVATE KEY-----
|
||||||
|
MIIEpAIBAAKCAQEA867o38el2Ux9DFqizupDQHh53VGfjhmlEV78SjDzBeXQONJc
|
||||||
|
TRunBM85atEZutWnZb/LReJESa7OhJ0YxFaDCJq3YdO7xhXTk2Kj9NS44gYRKwM8
|
||||||
|
kbUp9l/BU03vBMaAkbN8etUj3mxmNoTrxcbjumAz1Tkg7228b0YKiJTNWVwvHTrz
|
||||||
|
0khmfj/IVeehD2skbYX7tRVabZ5FvQsmQkSNIIPeR7/EQu2xhvrblqZDfWa0b6IY
|
||||||
|
YBkdfmw1IgTQaZ3BLEoflIFmo/CkTfA9eh+pvyJurcuOu49GrePRbz8eZwAi+PP4
|
||||||
|
IbWL84Ci3Lih7aJClLrs3mwyv4UMTd5lEgCUrQIDAQABAoIBAQDwEbBgznrIwn8r
|
||||||
|
jZt5x/brbAV7Ea/kOcWSgIaCvQifFdJ2OGAwov5/UXwajNgRZe2d4z7qoUhvYuUY
|
||||||
|
ZwCAZU6ASpRBr2v9cYFYYURvrqZaHmoJew3P6q/lhl6aqFvC06DUagRHqvXEafyk
|
||||||
|
13zEAvZVpfNKrBaTawPKiDFWb2qDDc9D6hC07EuJ/DNeehiHvzHrSZSDVV5Ut7Bw
|
||||||
|
YDm33XygheUPAlHfeCnaixzcs3osiVyFEmVjxcIaM0ZS1NgcSaohSpJHMzvEaohX
|
||||||
|
e+v9vccraSVlw01AlvFwI2vHYUV8jT6HwglTPKKGOCzK/ace3wPdYSU9qLcqfuHn
|
||||||
|
EFhNc3tNAoGBAPugLMgbReJg2gpbIPUkYyoMMAAU7llFU1WvPWwXzo1a9EBjBACw
|
||||||
|
WfCZISNtANXR38zIYXzoH547uXi4YPks1Nne3sYuCDpvuX+iz7fIo4zHf1nFmxH7
|
||||||
|
eE6GtQr2ubmuuipTc28S0wBMGT1/KybH0e2NKL6GaOkNDmAI0IbEMBrvAoGBAPfr
|
||||||
|
Y1QYLhPhan6m5g/5s+bQpKtHfNH9TNkk13HuYu72zNuY3qL2GC7oSadR8vTbRXZg
|
||||||
|
KQqfaO0IGRcdkSFTq/AEhSSqr2Ld5nPadMbKvSGrSCc1s8rFH97jRVQY56yhM7ti
|
||||||
|
IW4+6cE8ylCMbdYB6wuduK/GIgNpqoF4xs1i2XojAoGACacBUMPLEH4Kny8TupOk
|
||||||
|
wi4pgTdMVVxVcAoC3yyincWJbRbfRm99Y79cCBHcYFdmsGJXawU0gUtlN/5KqgRQ
|
||||||
|
PfNQtGV7p1I12XGTakdmDrZwai8sXao52TlNpJgGU9siBRGicfZU5cQFi9he/WPY
|
||||||
|
57XshDJ/v8DidkigRysrdT0CgYEA5iuO22tblC+KvK1dGOXeZWO+DhrfwuGlcFBp
|
||||||
|
CaimB2/w/8vsn2VVTG9yujo2E6hj1CQw1mDrfG0xRim4LTXOgpbfugwRqvuTUmo2
|
||||||
|
Ur21XEX2RhjwpEfhcACWxB4fMUG0krrniMA2K6axupi1/KNpQi6bYe3UdFCs8Wld
|
||||||
|
QSAOAvsCgYBk/X5PmD44DvndE5FShM2w70YOoMr3Cgl5sdwAFUFE9yDuC14UhVxk
|
||||||
|
oxnYxwtVI9uVVirET+LczP9JEvcvxnN/Xg3tH/qm0WlIxmTxyYrFFIK9j0rqeu9z
|
||||||
|
blPu56OzNI2VMrR1GbOBLxQINLTIpaacjNJAlr8XOlegdUJsW/Jwqw==
|
||||||
|
-----END RSA PRIVATE KEY-----
|
|
@ -156,3 +156,98 @@ setup_fake_telemetry_data() ->
|
||||||
{ok, _} = snabbkaffe_collector:receive_events(Sub),
|
{ok, _} = snabbkaffe_collector:receive_events(Sub),
|
||||||
ok = snabbkaffe:stop(),
|
ok = snabbkaffe:stop(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_update_ssl_conf(_) ->
|
||||||
|
Path = [bridges, <<"mqtt">>, <<"ssl_update_test">>],
|
||||||
|
EnableSSLConf = #{
|
||||||
|
<<"connector">> =>
|
||||||
|
#{
|
||||||
|
<<"bridge_mode">> => false,
|
||||||
|
<<"clean_start">> => true,
|
||||||
|
<<"keepalive">> => <<"60s">>,
|
||||||
|
<<"mode">> => <<"cluster_shareload">>,
|
||||||
|
<<"proto_ver">> => <<"v4">>,
|
||||||
|
<<"server">> => <<"127.0.0.1:1883">>,
|
||||||
|
<<"ssl">> =>
|
||||||
|
#{
|
||||||
|
<<"cacertfile">> => cert_file("cafile"),
|
||||||
|
<<"certfile">> => cert_file("certfile"),
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"keyfile">> => cert_file("keyfile"),
|
||||||
|
<<"verify">> => <<"verify_peer">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
<<"direction">> => <<"ingress">>,
|
||||||
|
<<"local_qos">> => 1,
|
||||||
|
<<"payload">> => <<"${payload}">>,
|
||||||
|
<<"remote_qos">> => 1,
|
||||||
|
<<"remote_topic">> => <<"t/#">>,
|
||||||
|
<<"retain">> => false
|
||||||
|
},
|
||||||
|
|
||||||
|
emqx:update_config(Path, EnableSSLConf),
|
||||||
|
?assertMatch({ok, [_, _, _]}, list_pem_dir(Path)),
|
||||||
|
NoSSLConf = #{
|
||||||
|
<<"connector">> =>
|
||||||
|
#{
|
||||||
|
<<"bridge_mode">> => false,
|
||||||
|
<<"clean_start">> => true,
|
||||||
|
<<"keepalive">> => <<"60s">>,
|
||||||
|
<<"max_inflight">> => 32,
|
||||||
|
<<"mode">> => <<"cluster_shareload">>,
|
||||||
|
<<"password">> => <<>>,
|
||||||
|
<<"proto_ver">> => <<"v4">>,
|
||||||
|
<<"reconnect_interval">> => <<"15s">>,
|
||||||
|
<<"replayq">> =>
|
||||||
|
#{<<"offload">> => false, <<"seg_bytes">> => <<"100MB">>},
|
||||||
|
<<"retry_interval">> => <<"15s">>,
|
||||||
|
<<"server">> => <<"127.0.0.1:1883">>,
|
||||||
|
<<"ssl">> =>
|
||||||
|
#{
|
||||||
|
<<"ciphers">> => <<>>,
|
||||||
|
<<"depth">> => 10,
|
||||||
|
<<"enable">> => false,
|
||||||
|
<<"reuse_sessions">> => true,
|
||||||
|
<<"secure_renegotiate">> => true,
|
||||||
|
<<"user_lookup_fun">> => <<"emqx_tls_psk:lookup">>,
|
||||||
|
<<"verify">> => <<"verify_peer">>,
|
||||||
|
<<"versions">> =>
|
||||||
|
[
|
||||||
|
<<"tlsv1.3">>,
|
||||||
|
<<"tlsv1.2">>,
|
||||||
|
<<"tlsv1.1">>,
|
||||||
|
<<"tlsv1">>
|
||||||
|
]
|
||||||
|
},
|
||||||
|
<<"username">> => <<>>
|
||||||
|
},
|
||||||
|
<<"direction">> => <<"ingress">>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"local_qos">> => 1,
|
||||||
|
<<"payload">> => <<"${payload}">>,
|
||||||
|
<<"remote_qos">> => 1,
|
||||||
|
<<"remote_topic">> => <<"t/#">>,
|
||||||
|
<<"retain">> => false
|
||||||
|
},
|
||||||
|
|
||||||
|
emqx:update_config(Path, NoSSLConf),
|
||||||
|
?assertMatch({error, not_dir}, list_pem_dir(Path)),
|
||||||
|
emqx:remove_config(Path),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
list_pem_dir(Path) ->
|
||||||
|
Dir = filename:join([emqx:mutable_certs_dir() | Path]),
|
||||||
|
case filelib:is_dir(Dir) of
|
||||||
|
true ->
|
||||||
|
file:list_dir(Dir);
|
||||||
|
_ ->
|
||||||
|
{error, not_dir}
|
||||||
|
end.
|
||||||
|
|
||||||
|
data_file(Name) ->
|
||||||
|
Dir = code:lib_dir(emqx_bridge, test),
|
||||||
|
{ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
|
||||||
|
Bin.
|
||||||
|
|
||||||
|
cert_file(Name) ->
|
||||||
|
data_file(filename:join(["certs", Name])).
|
||||||
|
|
|
@ -16,9 +16,12 @@
|
||||||
|
|
||||||
-module(emqx_connector_ssl).
|
-module(emqx_connector_ssl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
convert_certs/2,
|
convert_certs/2,
|
||||||
clear_certs/2
|
clear_certs/2,
|
||||||
|
try_clear_certs/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% TODO: rm `connector` case after `dev/ee5.0` merged into `master`.
|
%% TODO: rm `connector` case after `dev/ee5.0` merged into `master`.
|
||||||
|
@ -27,12 +30,12 @@
|
||||||
convert_certs(RltvDir, #{<<"connector">> := Connector} = Config) when
|
convert_certs(RltvDir, #{<<"connector">> := Connector} = Config) when
|
||||||
is_map(Connector)
|
is_map(Connector)
|
||||||
->
|
->
|
||||||
SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
|
SSL = maps:get(<<"ssl">>, Connector, undefined),
|
||||||
new_ssl_config(RltvDir, Config, SSL);
|
new_ssl_config(RltvDir, Config, SSL);
|
||||||
convert_certs(RltvDir, #{connector := Connector} = Config) when
|
convert_certs(RltvDir, #{connector := Connector} = Config) when
|
||||||
is_map(Connector)
|
is_map(Connector)
|
||||||
->
|
->
|
||||||
SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
|
SSL = maps:get(ssl, Connector, undefined),
|
||||||
new_ssl_config(RltvDir, Config, SSL);
|
new_ssl_config(RltvDir, Config, SSL);
|
||||||
%% for bridges without `connector` field. i.e. webhook
|
%% for bridges without `connector` field. i.e. webhook
|
||||||
convert_certs(RltvDir, #{<<"ssl">> := SSL} = Config) ->
|
convert_certs(RltvDir, #{<<"ssl">> := SSL} = Config) ->
|
||||||
|
@ -43,21 +46,37 @@ convert_certs(RltvDir, #{ssl := SSL} = Config) ->
|
||||||
convert_certs(_RltvDir, Config) ->
|
convert_certs(_RltvDir, Config) ->
|
||||||
{ok, Config}.
|
{ok, Config}.
|
||||||
|
|
||||||
clear_certs(RltvDir, #{<<"connector">> := Connector} = _Config) when
|
clear_certs(RltvDir, Config) ->
|
||||||
|
clear_certs2(RltvDir, normalize_key_to_bin(Config)).
|
||||||
|
|
||||||
|
clear_certs2(RltvDir, #{<<"connector">> := Connector} = _Config) when
|
||||||
is_map(Connector)
|
is_map(Connector)
|
||||||
->
|
->
|
||||||
OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
|
%% TODO remove the 'connector' clause after dev/ee5.0 is merged back to master
|
||||||
|
%% The `connector` config layer will be removed.
|
||||||
|
%% for bridges with `connector` field. i.e. `mqtt_source` and `mqtt_sink`
|
||||||
|
OldSSL = maps:get(<<"ssl">>, Connector, undefined),
|
||||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
||||||
clear_certs(RltvDir, #{connector := Connector} = _Config) when
|
clear_certs2(RltvDir, #{<<"ssl">> := OldSSL} = _Config) ->
|
||||||
is_map(Connector)
|
|
||||||
->
|
|
||||||
OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
|
|
||||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
||||||
clear_certs(RltvDir, #{<<"ssl">> := OldSSL} = _Config) ->
|
clear_certs2(_RltvDir, _) ->
|
||||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
ok.
|
||||||
clear_certs(RltvDir, #{ssl := OldSSL} = _Config) ->
|
|
||||||
ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
|
try_clear_certs(RltvDir, NewConf, OldConf) ->
|
||||||
clear_certs(_RltvDir, _) ->
|
try_clear_certs2(
|
||||||
|
RltvDir,
|
||||||
|
normalize_key_to_bin(NewConf),
|
||||||
|
normalize_key_to_bin(OldConf)
|
||||||
|
).
|
||||||
|
|
||||||
|
try_clear_certs2(RltvDir, #{<<"connector">> := NewConnector}, #{<<"connector">> := OldConnector}) ->
|
||||||
|
NewSSL = maps:get(<<"ssl">>, NewConnector, undefined),
|
||||||
|
OldSSL = maps:get(<<"ssl">>, OldConnector, undefined),
|
||||||
|
try_clear_certs2(RltvDir, NewSSL, OldSSL);
|
||||||
|
try_clear_certs2(RltvDir, NewSSL, OldSSL) when is_map(NewSSL) andalso is_map(OldSSL) ->
|
||||||
|
ok = emqx_tls_lib:delete_ssl_files(RltvDir, NewSSL, OldSSL);
|
||||||
|
try_clear_certs2(RltvDir, NewConf, OldConf) ->
|
||||||
|
?SLOG(debug, #{msg => "unexpected_conf", path => RltvDir, new => NewConf, OldConf => OldConf}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
new_ssl_config(RltvDir, Config, SSL) ->
|
new_ssl_config(RltvDir, Config, SSL) ->
|
||||||
|
@ -79,12 +98,5 @@ new_ssl_config(#{<<"ssl">> := _} = Config, NewSSL) ->
|
||||||
new_ssl_config(Config, _NewSSL) ->
|
new_ssl_config(Config, _NewSSL) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
map_get_oneof([], _Map, Default) ->
|
normalize_key_to_bin(Map) ->
|
||||||
Default;
|
emqx_map_lib:binary_key_map(Map).
|
||||||
map_get_oneof([Key | Keys], Map, Default) ->
|
|
||||||
case maps:find(Key, Map) of
|
|
||||||
error ->
|
|
||||||
map_get_oneof(Keys, Map, Default);
|
|
||||||
{ok, Value} ->
|
|
||||||
Value
|
|
||||||
end.
|
|
||||||
|
|
|
@ -124,4 +124,49 @@ MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消
|
||||||
zh: "失败的详细原因。"
|
zh: "失败的详细原因。"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
message_properties {
|
||||||
|
desc {
|
||||||
|
en: "The Properties of the PUBLISH message."
|
||||||
|
zh: "PUBLISH 消息里的 Property 字段。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_payload_format_indicator {
|
||||||
|
desc {
|
||||||
|
en: """0 (0x00) Byte Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator.
|
||||||
|
|
||||||
|
1 (0x01) Byte Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload MUST be well-formed UTF-8 as defined by the Unicode specification and restated in RFC 3629.
|
||||||
|
"""
|
||||||
|
zh: "载荷格式指示标识符,0 表示载荷是未指定格式的数据,相当于没有发送载荷格式指示;1 表示载荷是 UTF-8 编码的字符数据,载荷中的 UTF-8 数据必须是按照 Unicode 的规范和 RFC 3629 的标准要求进行编码的。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_message_expiry_interval {
|
||||||
|
desc {
|
||||||
|
en: "Identifier of the Message Expiry Interval. If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber."
|
||||||
|
zh: "消息过期间隔标识符,以秒为单位。当消失已经过期时,如果服务端还没有开始向匹配的订阅者投递该消息,则服务端会删除该订阅者的消息副本。如果不设置,则消息永远不会过期"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_response_topic {
|
||||||
|
desc {
|
||||||
|
en: "Identifier of the Response Topic.The Response Topic MUST be a UTF-8 Encoded, It MUST NOT contain wildcard characters."
|
||||||
|
zh: "响应主题标识符, UTF-8 编码的字符串,用作响应消息的主题名。响应主题不能包含通配符,也不能包含多个主题,否则将造成协议错误。当存在响应主题时,消息将被视作请求报文。服务端在收到应用消息时必须将响应主题原封不动的发送给所有的订阅者。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_correlation_data {
|
||||||
|
desc {
|
||||||
|
en: "Identifier of the Correlation Data. The Server MUST send the Correlation Data unaltered to all subscribers receiving the Application Message."
|
||||||
|
zh: "对比数据标识符,服务端在收到应用消息时必须原封不动的把对比数据发送给所有的订阅者。对比数据只对请求消息(Request Message)的发送端和响应消息(Response Message)的接收端有意义。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_user_properties {
|
||||||
|
desc {
|
||||||
|
en: "The User-Property key-value pairs. Note: in case there are duplicated keys, only the last one will be used."
|
||||||
|
zh: "指定 MQTT 消息的 User Property 键值对。注意,如果出现重复的键,只有最后一个会保留。"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg_content_type {
|
||||||
|
desc {
|
||||||
|
en: "The Content Type MUST be a UTF-8 Encoded String."
|
||||||
|
zh: "内容类型标识符,以 UTF-8 格式编码的字符串,用来描述应用消息的内容,服务端必须把收到的应用消息中的内容类型原封不动的发送给所有的订阅者。"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,6 +114,11 @@ fields(message) ->
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"hello emqx api">>
|
example => <<"hello emqx api">>
|
||||||
})},
|
})},
|
||||||
|
{properties,
|
||||||
|
hoconsc:mk(hoconsc:ref(?MODULE, message_properties), #{
|
||||||
|
desc => ?DESC(message_properties),
|
||||||
|
required => false
|
||||||
|
})},
|
||||||
{retain,
|
{retain,
|
||||||
hoconsc:mk(boolean(), #{
|
hoconsc:mk(boolean(), #{
|
||||||
desc => ?DESC(retain),
|
desc => ?DESC(retain),
|
||||||
|
@ -130,6 +135,43 @@ fields(publish_message) ->
|
||||||
default => plain
|
default => plain
|
||||||
})}
|
})}
|
||||||
] ++ fields(message);
|
] ++ fields(message);
|
||||||
|
fields(message_properties) ->
|
||||||
|
[
|
||||||
|
{'payload_format_indicator',
|
||||||
|
hoconsc:mk(typerefl:range(0, 1), #{
|
||||||
|
desc => ?DESC(msg_payload_format_indicator),
|
||||||
|
required => false,
|
||||||
|
example => 0
|
||||||
|
})},
|
||||||
|
{'message_expiry_interval',
|
||||||
|
hoconsc:mk(integer(), #{
|
||||||
|
desc => ?DESC(msg_message_expiry_interval),
|
||||||
|
required => false
|
||||||
|
})},
|
||||||
|
{'response_topic',
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(msg_response_topic),
|
||||||
|
required => false,
|
||||||
|
example => <<"some_other_topic">>
|
||||||
|
})},
|
||||||
|
{'correlation_data',
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(msg_correlation_data),
|
||||||
|
required => false
|
||||||
|
})},
|
||||||
|
{'user_properties',
|
||||||
|
hoconsc:mk(map(), #{
|
||||||
|
desc => ?DESC(msg_user_properties),
|
||||||
|
required => false,
|
||||||
|
example => #{<<"foo">> => <<"bar">>}
|
||||||
|
})},
|
||||||
|
{'content_type',
|
||||||
|
hoconsc:mk(binary(), #{
|
||||||
|
desc => ?DESC(msg_content_type),
|
||||||
|
required => false,
|
||||||
|
example => <<"text/plain">>
|
||||||
|
})}
|
||||||
|
];
|
||||||
fields(publish_ok) ->
|
fields(publish_ok) ->
|
||||||
[
|
[
|
||||||
{id,
|
{id,
|
||||||
|
@ -288,13 +330,23 @@ make_message(Map) ->
|
||||||
QoS = maps:get(<<"qos">>, Map, 0),
|
QoS = maps:get(<<"qos">>, Map, 0),
|
||||||
Topic = maps:get(<<"topic">>, Map),
|
Topic = maps:get(<<"topic">>, Map),
|
||||||
Retain = maps:get(<<"retain">>, Map, false),
|
Retain = maps:get(<<"retain">>, Map, false),
|
||||||
|
Headers =
|
||||||
|
case maps:get(<<"properties">>, Map, #{}) of
|
||||||
|
Properties when
|
||||||
|
is_map(Properties) andalso
|
||||||
|
map_size(Properties) > 0
|
||||||
|
->
|
||||||
|
#{properties => to_msg_properties(Properties)};
|
||||||
|
_ ->
|
||||||
|
#{}
|
||||||
|
end,
|
||||||
try
|
try
|
||||||
_ = emqx_topic:validate(name, Topic)
|
_ = emqx_topic:validate(name, Topic)
|
||||||
catch
|
catch
|
||||||
error:_Reason ->
|
error:_Reason ->
|
||||||
throw(invalid_topic_name)
|
throw(invalid_topic_name)
|
||||||
end,
|
end,
|
||||||
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}),
|
Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, Headers),
|
||||||
Size = emqx_message:estimate_size(Message),
|
Size = emqx_message:estimate_size(Message),
|
||||||
(Size > size_limit()) andalso throw(packet_too_large),
|
(Size > size_limit()) andalso throw(packet_too_large),
|
||||||
{ok, Message};
|
{ok, Message};
|
||||||
|
@ -302,6 +354,20 @@ make_message(Map) ->
|
||||||
{error, R}
|
{error, R}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
to_msg_properties(Properties) ->
|
||||||
|
maps:fold(
|
||||||
|
fun to_property/3,
|
||||||
|
#{},
|
||||||
|
Properties
|
||||||
|
).
|
||||||
|
|
||||||
|
to_property(<<"payload_format_indicator">>, V, M) -> M#{'Payload-Format-Indicator' => V};
|
||||||
|
to_property(<<"message_expiry_interval">>, V, M) -> M#{'Message-Expiry-Interval' => V};
|
||||||
|
to_property(<<"response_topic">>, V, M) -> M#{'Response-Topic' => V};
|
||||||
|
to_property(<<"correlation_data">>, V, M) -> M#{'Correlation-Data' => V};
|
||||||
|
to_property(<<"user_properties">>, V, M) -> M#{'User-Property' => maps:to_list(V)};
|
||||||
|
to_property(<<"content_type">>, V, M) -> M#{'Content-Type' => V}.
|
||||||
|
|
||||||
%% get the global packet size limit since HTTP API does not belong to any zone.
|
%% get the global packet size limit since HTTP API does not belong to any zone.
|
||||||
size_limit() ->
|
size_limit() ->
|
||||||
try
|
try
|
||||||
|
|
|
@ -20,9 +20,7 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-define(CLIENTID, <<"api_clientid">>).
|
|
||||||
-define(USERNAME, <<"api_username">>).
|
|
||||||
|
|
||||||
-define(TOPIC1, <<"api_topic1">>).
|
-define(TOPIC1, <<"api_topic1">>).
|
||||||
-define(TOPIC2, <<"api_topic2">>).
|
-define(TOPIC2, <<"api_topic2">>).
|
||||||
|
@ -44,25 +42,56 @@ end_per_testcase(Case, Config) ->
|
||||||
?MODULE:Case({'end', Config}).
|
?MODULE:Case({'end', Config}).
|
||||||
|
|
||||||
t_publish_api({init, Config}) ->
|
t_publish_api({init, Config}) ->
|
||||||
Config;
|
{ok, Client} = emqtt:start_link(
|
||||||
t_publish_api({'end', _Config}) ->
|
#{
|
||||||
ok;
|
username => <<"api_username">>,
|
||||||
t_publish_api(_) ->
|
clientid => <<"api_clientid">>,
|
||||||
{ok, Client} = emqtt:start_link(#{
|
proto_ver => v5
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
}
|
||||||
}),
|
),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
[{client, Client} | Config];
|
||||||
|
t_publish_api({'end', Config}) ->
|
||||||
|
Client = ?config(client, Config),
|
||||||
|
emqtt:stop(Client),
|
||||||
|
ok;
|
||||||
|
t_publish_api(_) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Body = #{topic => ?TOPIC1, payload => Payload},
|
UserProperties = #{<<"foo">> => <<"bar">>},
|
||||||
|
Properties =
|
||||||
|
#{
|
||||||
|
<<"payload_format_indicator">> => 0,
|
||||||
|
<<"message_expiry_interval">> => 1000,
|
||||||
|
<<"response_topic">> => ?TOPIC2,
|
||||||
|
<<"correlation_data">> => <<"some_correlation_id">>,
|
||||||
|
<<"user_properties">> => UserProperties,
|
||||||
|
<<"content_type">> => <<"application/json">>
|
||||||
|
},
|
||||||
|
Body = #{topic => ?TOPIC1, payload => Payload, properties => Properties},
|
||||||
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
|
||||||
ResponseMap = decode_json(Response),
|
ResponseMap = decode_json(Response),
|
||||||
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
|
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
{ok, Message} = receive_assert(?TOPIC1, 0, Payload),
|
||||||
emqtt:stop(Client).
|
RecvProperties = maps:get(properties, Message),
|
||||||
|
UserPropertiesList = maps:to_list(UserProperties),
|
||||||
|
#{
|
||||||
|
'Payload-Format-Indicator' := 0,
|
||||||
|
'Message-Expiry-Interval' := RecvMessageExpiry,
|
||||||
|
'Correlation-Data' := <<"some_correlation_id">>,
|
||||||
|
'User-Property' := UserPropertiesList,
|
||||||
|
'Content-Type' := <<"application/json">>
|
||||||
|
} = RecvProperties,
|
||||||
|
?assert(RecvMessageExpiry =< 1000),
|
||||||
|
%% note: without props this time
|
||||||
|
Body2 = #{topic => ?TOPIC2, payload => Payload},
|
||||||
|
{ok, Response2} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body2),
|
||||||
|
ResponseMap2 = decode_json(Response2),
|
||||||
|
?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap2))),
|
||||||
|
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).
|
||||||
|
|
||||||
t_publish_no_subscriber({init, Config}) ->
|
t_publish_no_subscriber({init, Config}) ->
|
||||||
Config;
|
Config;
|
||||||
|
@ -163,16 +192,18 @@ t_publish_bad_topic_bulk(_Config) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_publish_bulk_api({init, Config}) ->
|
t_publish_bulk_api({init, Config}) ->
|
||||||
Config;
|
|
||||||
t_publish_bulk_api({'end', _Config}) ->
|
|
||||||
ok;
|
|
||||||
t_publish_bulk_api(_) ->
|
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
}),
|
}),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
[{client, Client} | Config];
|
||||||
|
t_publish_bulk_api({'end', Config}) ->
|
||||||
|
Client = ?config(client, Config),
|
||||||
|
emqtt:stop(Client),
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_api(_) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
@ -199,9 +230,8 @@ t_publish_bulk_api(_) ->
|
||||||
end,
|
end,
|
||||||
ResponseList
|
ResponseList
|
||||||
),
|
),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))).
|
||||||
emqtt:stop(Client).
|
|
||||||
|
|
||||||
t_publish_no_subscriber_bulk({init, Config}) ->
|
t_publish_no_subscriber_bulk({init, Config}) ->
|
||||||
Config;
|
Config;
|
||||||
|
@ -232,8 +262,8 @@ t_publish_no_subscriber_bulk(_) ->
|
||||||
],
|
],
|
||||||
ResponseList
|
ResponseList
|
||||||
),
|
),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)),
|
?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))),
|
||||||
?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)),
|
?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
|
t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) ->
|
||||||
|
@ -267,17 +297,19 @@ t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) -
|
||||||
t_publish_bulk_dispatch_failure({init, Config}) ->
|
t_publish_bulk_dispatch_failure({init, Config}) ->
|
||||||
meck:new(emqx, [no_link, passthrough, no_history]),
|
meck:new(emqx, [no_link, passthrough, no_history]),
|
||||||
meck:expect(emqx, is_running, fun() -> false end),
|
meck:expect(emqx, is_running, fun() -> false end),
|
||||||
Config;
|
|
||||||
t_publish_bulk_dispatch_failure({'end', _Config}) ->
|
|
||||||
meck:unload(emqx),
|
|
||||||
ok;
|
|
||||||
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
|
||||||
{ok, Client} = emqtt:start_link(#{
|
{ok, Client} = emqtt:start_link(#{
|
||||||
username => <<"api_username">>, clientid => <<"api_clientid">>
|
username => <<"api_username">>, clientid => <<"api_clientid">>
|
||||||
}),
|
}),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
|
||||||
|
[{client, Client} | Config];
|
||||||
|
t_publish_bulk_dispatch_failure({'end', Config}) ->
|
||||||
|
meck:unload(emqx),
|
||||||
|
Client = ?config(client, Config),
|
||||||
|
emqtt:stop(Client),
|
||||||
|
ok;
|
||||||
|
t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
|
||||||
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
@ -303,8 +335,7 @@ t_publish_bulk_dispatch_failure(Config) when is_list(Config) ->
|
||||||
#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
|
#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}
|
||||||
],
|
],
|
||||||
decode_json(ResponseBody)
|
decode_json(ResponseBody)
|
||||||
),
|
).
|
||||||
emqtt:stop(Client).
|
|
||||||
|
|
||||||
receive_assert(Topic, Qos, Payload) ->
|
receive_assert(Topic, Qos, Payload) ->
|
||||||
receive
|
receive
|
||||||
|
@ -312,12 +343,12 @@ receive_assert(Topic, Qos, Payload) ->
|
||||||
ReceiveTopic = maps:get(topic, Message),
|
ReceiveTopic = maps:get(topic, Message),
|
||||||
ReceiveQos = maps:get(qos, Message),
|
ReceiveQos = maps:get(qos, Message),
|
||||||
ReceivePayload = maps:get(payload, Message),
|
ReceivePayload = maps:get(payload, Message),
|
||||||
?assertEqual(ReceiveTopic, Topic),
|
?assertEqual(Topic, ReceiveTopic),
|
||||||
?assertEqual(ReceiveQos, Qos),
|
?assertEqual(Qos, ReceiveQos),
|
||||||
?assertEqual(ReceivePayload, Payload),
|
?assertEqual(Payload, ReceivePayload),
|
||||||
ok
|
{ok, Message}
|
||||||
after 5000 ->
|
after 5000 ->
|
||||||
timeout
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
decode_json(In) ->
|
decode_json(In) ->
|
||||||
|
|
|
@ -25,6 +25,8 @@
|
||||||
|
|
||||||
- Add a new config `quick_deny_anonymous` to allow quick deny of anonymous clients (without username) so the auth backend checks can be skipped [#8516](https://github.com/emqx/emqx/pull/8516).
|
- Add a new config `quick_deny_anonymous` to allow quick deny of anonymous clients (without username) so the auth backend checks can be skipped [#8516](https://github.com/emqx/emqx/pull/8516).
|
||||||
|
|
||||||
|
- Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401).
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
|
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
|
||||||
|
@ -38,3 +40,5 @@
|
||||||
- Fix that `/configs/global_zone` API cannot get the default value of the configuration [#9392](https://github.com/emqx/emqx/pull/9392).
|
- Fix that `/configs/global_zone` API cannot get the default value of the configuration [#9392](https://github.com/emqx/emqx/pull/9392).
|
||||||
|
|
||||||
- Fix mountpoint not working for will-msg [#9399](https://github.com/emqx/emqx/pull/9399).
|
- Fix mountpoint not working for will-msg [#9399](https://github.com/emqx/emqx/pull/9399).
|
||||||
|
|
||||||
|
- Fix that the obsolete SSL files aren't deleted after the bridge configuration update [#9411](https://github.com/emqx/emqx/pull/9411).
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
|
|
||||||
- 添加了一个名为 `quick_deny_anonymous` 的新配置,用来在不调用认证链的情况下,快速的拒绝掉匿名用户,从而提高认证效率 [#8516](https://github.com/emqx/emqx/pull/8516)。
|
- 添加了一个名为 `quick_deny_anonymous` 的新配置,用来在不调用认证链的情况下,快速的拒绝掉匿名用户,从而提高认证效率 [#8516](https://github.com/emqx/emqx/pull/8516)。
|
||||||
|
|
||||||
|
- 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。
|
||||||
|
|
||||||
## 修复
|
## 修复
|
||||||
|
|
||||||
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
|
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。
|
||||||
|
@ -36,3 +38,5 @@
|
||||||
- 修复 `/configs/global_zone` API 无法正确获取配置的默认值问题 [#9392](https://github.com/emqx/emqx/pull/9392)。
|
- 修复 `/configs/global_zone` API 无法正确获取配置的默认值问题 [#9392](https://github.com/emqx/emqx/pull/9392)。
|
||||||
|
|
||||||
- 修复 mountpoint 配置未对遗嘱消息生效的问题 [#9399](https://github.com/emqx/emqx/pull/9399)
|
- 修复 mountpoint 配置未对遗嘱消息生效的问题 [#9399](https://github.com/emqx/emqx/pull/9399)
|
||||||
|
|
||||||
|
- 修复桥接配置更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9411](https://github.com/emqx/emqx/pull/9411)。
|
||||||
|
|
Loading…
Reference in New Issue