diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index c714d7dbc..b01391c7b 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -23,6 +23,7 @@ deep_force_put/3, deep_remove/2, deep_merge/2, + binary_key_map/1, safe_atom_key_map/1, unsafe_atom_key_map/1, jsonable_map/1, @@ -153,6 +154,17 @@ deep_convert(Val, _, _Args) -> unsafe_atom_key_map(Map) -> covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end). +-spec binary_key_map(map()) -> map(). +binary_key_map(Map) -> + deep_convert( + Map, + fun + (K, V) when is_atom(K) -> {atom_to_binary(K, utf8), V}; + (K, V) when is_binary(K) -> {K, V} + end, + [] + ). + -spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}. safe_atom_key_map(Map) -> covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end). diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 3fc4d57ba..958bbf288 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -58,7 +58,8 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> post_config_update(Path, '$remove', _, OldConf, _AppEnvs) -> _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf); -post_config_update(_Path, _Req, _, _OldConf, _AppEnvs) -> +post_config_update(Path, _Req, NewConf, OldConf, _AppEnvs) -> + _ = emqx_connector_ssl:try_clear_certs(filename:join(Path), NewConf, OldConf), ok. %% internal functions diff --git a/apps/emqx_bridge/test/data/certs/cafile b/apps/emqx_bridge/test/data/certs/cafile new file mode 100644 index 000000000..8a9dafccd --- /dev/null +++ b/apps/emqx_bridge/test/data/certs/cafile @@ -0,0 +1,29 @@ +-----BEGIN CERTIFICATE----- +MIIE5DCCAswCCQCF3o0gIdaNDjANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQKDAlF +TVFYIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMTEy +MzAwODQxMTFaFw00OTA1MTcwODQxMTFaMDQxEjAQBgNVBAoMCUVNUVggVGVzdDEe +MBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF +AAOCAg8AMIICCgKCAgEAqmqSrxyH16j63QhqGLT1UO8I+m6BM3HfnJQM8laQdtJ0 +WgHqCh0/OphH3S7v4SfF4fNJDEJWMWuuzJzU9cTqHPLzhvo3+ZHcMIENgtY2p2Cf +7AQjEqFViEDyv2ZWNEe76BJeShntdY5NZr4gIPar99YGG/Ln8YekspleV+DU38rE +EX9WzhgBr02NN9z4NzIxeB+jdvPnxcXs3WpUxzfnUjOQf/T1tManvSdRbFmKMbxl +A8NLYK3oAYm8EbljWUINUNN6loqYhbigKv8bvo5S4xvRqmX86XB7sc0SApngtNcg +O0EKn8z/KVPDskE+8lMfGMiU2e2Tzw6Rph57mQPOPtIp5hPiKRik7ST9n0p6piXW +zRLplJEzSjf40I1u+VHmpXlWI/Fs8b1UkDSMiMVJf0LyWb4ziBSZOY2LtZzWHbWj +LbNgxQcwSS29tKgUwfEFmFcm+iOM59cPfkl2IgqVLh5h4zmKJJbfQKSaYb5fcKRf +50b1qsN40VbR3Pk/0lJ0/WqgF6kZCExmT1qzD5HJES/5grjjKA4zIxmHOVU86xOF +ouWvtilVR4PGkzmkFvwK5yRhBUoGH/A9BurhqOc0QCGay1kqHQFA6se4JJS+9KOS +x8Rn1Nm6Pi7sd6Le3cKmHTlyl5a/ofKqTCX2Qh+v/7y62V1V1wnoh3ipRjdPTnMC +AwEAATANBgkqhkiG9w0BAQsFAAOCAgEARCqaocvlMFUQjtFtepO2vyG1krn11xJ0 +e7md26i+g8SxCCYqQ9IqGmQBg0Im8fyNDKRN/LZoj5+A4U4XkG1yya91ZIrPpWyF +KUiRAItchNj3g1kHmI2ckl1N//6Kpx3DPaS7qXZaN3LTExf6Ph+StE1FnS0wVF+s +tsNIf6EaQ+ZewW3pjdlLeAws3jvWKUkROc408Ngvx74zbbKo/zAC4tz8oH9ZcpsT +WD8enVVEeUQKI6ItcpZ9HgTI9TFWgfZ1vYwvkoRwNIeabYI62JKmLEo2vGfGwWKr +c+GjnJ/tlVI2DpPljfWOnQ037/7yyJI/zo65+HPRmGRD6MuW/BdPDYOvOZUTcQKh +kANi5THSbJJgZcG3jb1NLebaUQ1H0zgVjn0g3KhUV+NJQYk8RQ7rHtB+MySqTKlM +kRkRjfTfR0Ykxpks7Mjvsb6NcZENf08ZFPd45+e/ptsxpiKu4e4W4bV7NZDvNKf9 +0/aD3oGYNMiP7s+KJ1lRSAjnBuG21Yk8FpzG+yr8wvJhV8aFgNQ5wIH86SuUTmN0 +5bVzFEIcUejIwvGoQEctNHBlOwHrb7zmB6OwyZeMapdXBQ+9UDhYg8ehDqdDOdfn +wsBcnjD2MwNhlE1hjL+tZWLNwSHiD6xx3LvNoXZu2HK8Cp3SOrkE69cFghYMIZZb +T+fp6tNL6LE= +-----END CERTIFICATE----- diff --git a/apps/emqx_bridge/test/data/certs/certfile b/apps/emqx_bridge/test/data/certs/certfile new file mode 100644 index 000000000..a198faf61 --- /dev/null +++ b/apps/emqx_bridge/test/data/certs/certfile @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIID/jCCAeagAwIBAgIJAKTICmq1Lg6dMA0GCSqGSIb3DQEBCwUAMDQxEjAQBgNV +BAoMCUVNUVggVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4X +DTIxMTIzMDA4NDExMloXDTQ5MDUxNzA4NDExMlowJTESMBAGA1UECgwJRU1RWCBU +ZXN0MQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDzrujfx6XZTH0MWqLO6kNAeHndUZ+OGaURXvxKMPMF5dA40lxNG6cEzzlq +0Rm61adlv8tF4kRJrs6EnRjEVoMImrdh07vGFdOTYqP01LjiBhErAzyRtSn2X8FT +Te8ExoCRs3x61SPebGY2hOvFxuO6YDPVOSDvbbxvRgqIlM1ZXC8dOvPSSGZ+P8hV +56EPayRthfu1FVptnkW9CyZCRI0gg95Hv8RC7bGG+tuWpkN9ZrRvohhgGR1+bDUi +BNBpncEsSh+UgWaj8KRN8D16H6m/Im6ty467j0at49FvPx5nACL48/ghtYvzgKLc +uKHtokKUuuzebDK/hQxN3mUSAJStAgMBAAGjIjAgMAsGA1UdDwQEAwIFoDARBglg +hkgBhvhCAQEEBAMCB4AwDQYJKoZIhvcNAQELBQADggIBAIlVyPhOpkz3MNzQmjX7 +xgJ3vGPK5uK11n/wfjRwe2qXwZbrI2sYLVtTpUgvLDuP0gB73Vwfu7xAMdue6TRm +CKr9z0lkQsVBtgoqzZCjd4PYLfHm4EhsOMi98OGKU5uOGD4g3yLwQWXHhbYtiZMO +Jsj0hebYveYJt/BYTd1syGQcIcYCyVExWvSWjidfpAqjT6EF7whdubaFtuF2kaGF +IO9yn9rWtXB5yK99uCguEmKhx3fAQxomzqweTu3WRvy9axsUH3WAUW9a4DIBSz2+ +ZSJNheFn5GktgggygJUGYqpSZHooUJW0UBs/8vX6AP+8MtINmqOGZUawmNwLWLOq +wHyVt2YGD5TXjzzsWNSQ4mqXxM6AXniZVZK0yYNjA4ATikX1AtwunyWBR4IjyE/D +FxYPORdZCOtywRFE1R5KLTUq/C8BNGCkYnoO78DJBO+pT0oagkQGQb0CnmC6C1db +4lWzA9K0i4B0PyooZA+gp+5FFgaLuX1DkyeaY1J204QhHR1z/Vcyl5dpqR9hqnYP +t8raLk9ogMDKqKA9iG0wc3CBNckD4sjVWAEeovXhElG55fD21wwhF+AnDCvX8iVK +cBfKV6z6uxfKjGIxc2I643I5DiIn+V3DnPxYyY74Ln1lWFYmt5JREhAxPu42zq74 +e6+eIMYFszB+5gKgt6pa6ZNI +-----END CERTIFICATE----- diff --git a/apps/emqx_bridge/test/data/certs/keyfile b/apps/emqx_bridge/test/data/certs/keyfile new file mode 100644 index 000000000..2f0af5d41 --- /dev/null +++ b/apps/emqx_bridge/test/data/certs/keyfile @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA867o38el2Ux9DFqizupDQHh53VGfjhmlEV78SjDzBeXQONJc +TRunBM85atEZutWnZb/LReJESa7OhJ0YxFaDCJq3YdO7xhXTk2Kj9NS44gYRKwM8 +kbUp9l/BU03vBMaAkbN8etUj3mxmNoTrxcbjumAz1Tkg7228b0YKiJTNWVwvHTrz +0khmfj/IVeehD2skbYX7tRVabZ5FvQsmQkSNIIPeR7/EQu2xhvrblqZDfWa0b6IY +YBkdfmw1IgTQaZ3BLEoflIFmo/CkTfA9eh+pvyJurcuOu49GrePRbz8eZwAi+PP4 +IbWL84Ci3Lih7aJClLrs3mwyv4UMTd5lEgCUrQIDAQABAoIBAQDwEbBgznrIwn8r +jZt5x/brbAV7Ea/kOcWSgIaCvQifFdJ2OGAwov5/UXwajNgRZe2d4z7qoUhvYuUY +ZwCAZU6ASpRBr2v9cYFYYURvrqZaHmoJew3P6q/lhl6aqFvC06DUagRHqvXEafyk +13zEAvZVpfNKrBaTawPKiDFWb2qDDc9D6hC07EuJ/DNeehiHvzHrSZSDVV5Ut7Bw +YDm33XygheUPAlHfeCnaixzcs3osiVyFEmVjxcIaM0ZS1NgcSaohSpJHMzvEaohX +e+v9vccraSVlw01AlvFwI2vHYUV8jT6HwglTPKKGOCzK/ace3wPdYSU9qLcqfuHn +EFhNc3tNAoGBAPugLMgbReJg2gpbIPUkYyoMMAAU7llFU1WvPWwXzo1a9EBjBACw +WfCZISNtANXR38zIYXzoH547uXi4YPks1Nne3sYuCDpvuX+iz7fIo4zHf1nFmxH7 +eE6GtQr2ubmuuipTc28S0wBMGT1/KybH0e2NKL6GaOkNDmAI0IbEMBrvAoGBAPfr +Y1QYLhPhan6m5g/5s+bQpKtHfNH9TNkk13HuYu72zNuY3qL2GC7oSadR8vTbRXZg +KQqfaO0IGRcdkSFTq/AEhSSqr2Ld5nPadMbKvSGrSCc1s8rFH97jRVQY56yhM7ti +IW4+6cE8ylCMbdYB6wuduK/GIgNpqoF4xs1i2XojAoGACacBUMPLEH4Kny8TupOk +wi4pgTdMVVxVcAoC3yyincWJbRbfRm99Y79cCBHcYFdmsGJXawU0gUtlN/5KqgRQ +PfNQtGV7p1I12XGTakdmDrZwai8sXao52TlNpJgGU9siBRGicfZU5cQFi9he/WPY +57XshDJ/v8DidkigRysrdT0CgYEA5iuO22tblC+KvK1dGOXeZWO+DhrfwuGlcFBp +CaimB2/w/8vsn2VVTG9yujo2E6hj1CQw1mDrfG0xRim4LTXOgpbfugwRqvuTUmo2 +Ur21XEX2RhjwpEfhcACWxB4fMUG0krrniMA2K6axupi1/KNpQi6bYe3UdFCs8Wld +QSAOAvsCgYBk/X5PmD44DvndE5FShM2w70YOoMr3Cgl5sdwAFUFE9yDuC14UhVxk +oxnYxwtVI9uVVirET+LczP9JEvcvxnN/Xg3tH/qm0WlIxmTxyYrFFIK9j0rqeu9z +blPu56OzNI2VMrR1GbOBLxQINLTIpaacjNJAlr8XOlegdUJsW/Jwqw== +-----END RSA PRIVATE KEY----- diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index d8266f83a..dca14b829 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -156,3 +156,98 @@ setup_fake_telemetry_data() -> {ok, _} = snabbkaffe_collector:receive_events(Sub), ok = snabbkaffe:stop(), ok. + +t_update_ssl_conf(_) -> + Path = [bridges, <<"mqtt">>, <<"ssl_update_test">>], + EnableSSLConf = #{ + <<"connector">> => + #{ + <<"bridge_mode">> => false, + <<"clean_start">> => true, + <<"keepalive">> => <<"60s">>, + <<"mode">> => <<"cluster_shareload">>, + <<"proto_ver">> => <<"v4">>, + <<"server">> => <<"127.0.0.1:1883">>, + <<"ssl">> => + #{ + <<"cacertfile">> => cert_file("cafile"), + <<"certfile">> => cert_file("certfile"), + <<"enable">> => true, + <<"keyfile">> => cert_file("keyfile"), + <<"verify">> => <<"verify_peer">> + } + }, + <<"direction">> => <<"ingress">>, + <<"local_qos">> => 1, + <<"payload">> => <<"${payload}">>, + <<"remote_qos">> => 1, + <<"remote_topic">> => <<"t/#">>, + <<"retain">> => false + }, + + emqx:update_config(Path, EnableSSLConf), + ?assertMatch({ok, [_, _, _]}, list_pem_dir(Path)), + NoSSLConf = #{ + <<"connector">> => + #{ + <<"bridge_mode">> => false, + <<"clean_start">> => true, + <<"keepalive">> => <<"60s">>, + <<"max_inflight">> => 32, + <<"mode">> => <<"cluster_shareload">>, + <<"password">> => <<>>, + <<"proto_ver">> => <<"v4">>, + <<"reconnect_interval">> => <<"15s">>, + <<"replayq">> => + #{<<"offload">> => false, <<"seg_bytes">> => <<"100MB">>}, + <<"retry_interval">> => <<"15s">>, + <<"server">> => <<"127.0.0.1:1883">>, + <<"ssl">> => + #{ + <<"ciphers">> => <<>>, + <<"depth">> => 10, + <<"enable">> => false, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"user_lookup_fun">> => <<"emqx_tls_psk:lookup">>, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => + [ + <<"tlsv1.3">>, + <<"tlsv1.2">>, + <<"tlsv1.1">>, + <<"tlsv1">> + ] + }, + <<"username">> => <<>> + }, + <<"direction">> => <<"ingress">>, + <<"enable">> => true, + <<"local_qos">> => 1, + <<"payload">> => <<"${payload}">>, + <<"remote_qos">> => 1, + <<"remote_topic">> => <<"t/#">>, + <<"retain">> => false + }, + + emqx:update_config(Path, NoSSLConf), + ?assertMatch({error, not_dir}, list_pem_dir(Path)), + emqx:remove_config(Path), + ok. + +list_pem_dir(Path) -> + Dir = filename:join([emqx:mutable_certs_dir() | Path]), + case filelib:is_dir(Dir) of + true -> + file:list_dir(Dir); + _ -> + {error, not_dir} + end. + +data_file(Name) -> + Dir = code:lib_dir(emqx_bridge, test), + {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])), + Bin. + +cert_file(Name) -> + data_file(filename:join(["certs", Name])). diff --git a/apps/emqx_connector/src/emqx_connector_ssl.erl b/apps/emqx_connector/src/emqx_connector_ssl.erl index 7f2fc537b..7dc6179e1 100644 --- a/apps/emqx_connector/src/emqx_connector_ssl.erl +++ b/apps/emqx_connector/src/emqx_connector_ssl.erl @@ -16,9 +16,12 @@ -module(emqx_connector_ssl). +-include_lib("emqx/include/logger.hrl"). + -export([ convert_certs/2, - clear_certs/2 + clear_certs/2, + try_clear_certs/3 ]). %% TODO: rm `connector` case after `dev/ee5.0` merged into `master`. @@ -27,12 +30,12 @@ convert_certs(RltvDir, #{<<"connector">> := Connector} = Config) when is_map(Connector) -> - SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined), + SSL = maps:get(<<"ssl">>, Connector, undefined), new_ssl_config(RltvDir, Config, SSL); convert_certs(RltvDir, #{connector := Connector} = Config) when is_map(Connector) -> - SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined), + SSL = maps:get(ssl, Connector, undefined), new_ssl_config(RltvDir, Config, SSL); %% for bridges without `connector` field. i.e. webhook convert_certs(RltvDir, #{<<"ssl">> := SSL} = Config) -> @@ -43,21 +46,37 @@ convert_certs(RltvDir, #{ssl := SSL} = Config) -> convert_certs(_RltvDir, Config) -> {ok, Config}. -clear_certs(RltvDir, #{<<"connector">> := Connector} = _Config) when +clear_certs(RltvDir, Config) -> + clear_certs2(RltvDir, normalize_key_to_bin(Config)). + +clear_certs2(RltvDir, #{<<"connector">> := Connector} = _Config) when is_map(Connector) -> - OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined), + %% TODO remove the 'connector' clause after dev/ee5.0 is merged back to master + %% The `connector` config layer will be removed. + %% for bridges with `connector` field. i.e. `mqtt_source` and `mqtt_sink` + OldSSL = maps:get(<<"ssl">>, Connector, undefined), ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL); -clear_certs(RltvDir, #{connector := Connector} = _Config) when - is_map(Connector) --> - OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined), +clear_certs2(RltvDir, #{<<"ssl">> := OldSSL} = _Config) -> ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL); -clear_certs(RltvDir, #{<<"ssl">> := OldSSL} = _Config) -> - ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL); -clear_certs(RltvDir, #{ssl := OldSSL} = _Config) -> - ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL); -clear_certs(_RltvDir, _) -> +clear_certs2(_RltvDir, _) -> + ok. + +try_clear_certs(RltvDir, NewConf, OldConf) -> + try_clear_certs2( + RltvDir, + normalize_key_to_bin(NewConf), + normalize_key_to_bin(OldConf) + ). + +try_clear_certs2(RltvDir, #{<<"connector">> := NewConnector}, #{<<"connector">> := OldConnector}) -> + NewSSL = maps:get(<<"ssl">>, NewConnector, undefined), + OldSSL = maps:get(<<"ssl">>, OldConnector, undefined), + try_clear_certs2(RltvDir, NewSSL, OldSSL); +try_clear_certs2(RltvDir, NewSSL, OldSSL) when is_map(NewSSL) andalso is_map(OldSSL) -> + ok = emqx_tls_lib:delete_ssl_files(RltvDir, NewSSL, OldSSL); +try_clear_certs2(RltvDir, NewConf, OldConf) -> + ?SLOG(debug, #{msg => "unexpected_conf", path => RltvDir, new => NewConf, OldConf => OldConf}), ok. new_ssl_config(RltvDir, Config, SSL) -> @@ -79,12 +98,5 @@ new_ssl_config(#{<<"ssl">> := _} = Config, NewSSL) -> new_ssl_config(Config, _NewSSL) -> Config. -map_get_oneof([], _Map, Default) -> - Default; -map_get_oneof([Key | Keys], Map, Default) -> - case maps:find(Key, Map) of - error -> - map_get_oneof(Keys, Map, Default); - {ok, Value} -> - Value - end. +normalize_key_to_bin(Map) -> + emqx_map_lib:binary_key_map(Map). diff --git a/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf index 2a7c9def8..d845bff4b 100644 --- a/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf +++ b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf @@ -124,4 +124,49 @@ MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消 zh: "失败的详细原因。" } } + message_properties { + desc { + en: "The Properties of the PUBLISH message." + zh: "PUBLISH 消息里的 Property 字段。" + } + } + msg_payload_format_indicator { + desc { + en: """0 (0x00) Byte Indicates that the Payload is unspecified bytes, which is equivalent to not sending a Payload Format Indicator. + +1 (0x01) Byte Indicates that the Payload is UTF-8 Encoded Character Data. The UTF-8 data in the Payload MUST be well-formed UTF-8 as defined by the Unicode specification and restated in RFC 3629. +""" + zh: "载荷格式指示标识符,0 表示载荷是未指定格式的数据,相当于没有发送载荷格式指示;1 表示载荷是 UTF-8 编码的字符数据,载荷中的 UTF-8 数据必须是按照 Unicode 的规范和 RFC 3629 的标准要求进行编码的。" + } + } + msg_message_expiry_interval { + desc { + en: "Identifier of the Message Expiry Interval. If the Message Expiry Interval has passed and the Server has not managed to start onward delivery to a matching subscriber, then it MUST delete the copy of the message for that subscriber." + zh: "消息过期间隔标识符,以秒为单位。当消失已经过期时,如果服务端还没有开始向匹配的订阅者投递该消息,则服务端会删除该订阅者的消息副本。如果不设置,则消息永远不会过期" + } + } + msg_response_topic { + desc { + en: "Identifier of the Response Topic.The Response Topic MUST be a UTF-8 Encoded, It MUST NOT contain wildcard characters." + zh: "响应主题标识符, UTF-8 编码的字符串,用作响应消息的主题名。响应主题不能包含通配符,也不能包含多个主题,否则将造成协议错误。当存在响应主题时,消息将被视作请求报文。服务端在收到应用消息时必须将响应主题原封不动的发送给所有的订阅者。" + } + } + msg_correlation_data { + desc { + en: "Identifier of the Correlation Data. The Server MUST send the Correlation Data unaltered to all subscribers receiving the Application Message." + zh: "对比数据标识符,服务端在收到应用消息时必须原封不动的把对比数据发送给所有的订阅者。对比数据只对请求消息(Request Message)的发送端和响应消息(Response Message)的接收端有意义。" + } + } + msg_user_properties { + desc { + en: "The User-Property key-value pairs. Note: in case there are duplicated keys, only the last one will be used." + zh: "指定 MQTT 消息的 User Property 键值对。注意,如果出现重复的键,只有最后一个会保留。" + } + } + msg_content_type { + desc { + en: "The Content Type MUST be a UTF-8 Encoded String." + zh: "内容类型标识符,以 UTF-8 格式编码的字符串,用来描述应用消息的内容,服务端必须把收到的应用消息中的内容类型原封不动的发送给所有的订阅者。" + } + } } diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 1678c56e0..b1b1f1b5e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -114,6 +114,11 @@ fields(message) -> required => true, example => <<"hello emqx api">> })}, + {properties, + hoconsc:mk(hoconsc:ref(?MODULE, message_properties), #{ + desc => ?DESC(message_properties), + required => false + })}, {retain, hoconsc:mk(boolean(), #{ desc => ?DESC(retain), @@ -130,6 +135,43 @@ fields(publish_message) -> default => plain })} ] ++ fields(message); +fields(message_properties) -> + [ + {'payload_format_indicator', + hoconsc:mk(typerefl:range(0, 1), #{ + desc => ?DESC(msg_payload_format_indicator), + required => false, + example => 0 + })}, + {'message_expiry_interval', + hoconsc:mk(integer(), #{ + desc => ?DESC(msg_message_expiry_interval), + required => false + })}, + {'response_topic', + hoconsc:mk(binary(), #{ + desc => ?DESC(msg_response_topic), + required => false, + example => <<"some_other_topic">> + })}, + {'correlation_data', + hoconsc:mk(binary(), #{ + desc => ?DESC(msg_correlation_data), + required => false + })}, + {'user_properties', + hoconsc:mk(map(), #{ + desc => ?DESC(msg_user_properties), + required => false, + example => #{<<"foo">> => <<"bar">>} + })}, + {'content_type', + hoconsc:mk(binary(), #{ + desc => ?DESC(msg_content_type), + required => false, + example => <<"text/plain">> + })} + ]; fields(publish_ok) -> [ {id, @@ -288,13 +330,23 @@ make_message(Map) -> QoS = maps:get(<<"qos">>, Map, 0), Topic = maps:get(<<"topic">>, Map), Retain = maps:get(<<"retain">>, Map, false), + Headers = + case maps:get(<<"properties">>, Map, #{}) of + Properties when + is_map(Properties) andalso + map_size(Properties) > 0 + -> + #{properties => to_msg_properties(Properties)}; + _ -> + #{} + end, try _ = emqx_topic:validate(name, Topic) catch error:_Reason -> throw(invalid_topic_name) end, - Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}), + Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, Headers), Size = emqx_message:estimate_size(Message), (Size > size_limit()) andalso throw(packet_too_large), {ok, Message}; @@ -302,6 +354,20 @@ make_message(Map) -> {error, R} end. +to_msg_properties(Properties) -> + maps:fold( + fun to_property/3, + #{}, + Properties + ). + +to_property(<<"payload_format_indicator">>, V, M) -> M#{'Payload-Format-Indicator' => V}; +to_property(<<"message_expiry_interval">>, V, M) -> M#{'Message-Expiry-Interval' => V}; +to_property(<<"response_topic">>, V, M) -> M#{'Response-Topic' => V}; +to_property(<<"correlation_data">>, V, M) -> M#{'Correlation-Data' => V}; +to_property(<<"user_properties">>, V, M) -> M#{'User-Property' => maps:to_list(V)}; +to_property(<<"content_type">>, V, M) -> M#{'Content-Type' => V}. + %% get the global packet size limit since HTTP API does not belong to any zone. size_limit() -> try diff --git a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl index 0ebaf7195..7622b0d17 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl @@ -20,9 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). - --define(CLIENTID, <<"api_clientid">>). --define(USERNAME, <<"api_username">>). +-include_lib("common_test/include/ct.hrl"). -define(TOPIC1, <<"api_topic1">>). -define(TOPIC2, <<"api_topic2">>). @@ -44,25 +42,56 @@ end_per_testcase(Case, Config) -> ?MODULE:Case({'end', Config}). t_publish_api({init, Config}) -> - Config; -t_publish_api({'end', _Config}) -> - ok; -t_publish_api(_) -> - {ok, Client} = emqtt:start_link(#{ - username => <<"api_username">>, clientid => <<"api_clientid">> - }), + {ok, Client} = emqtt:start_link( + #{ + username => <<"api_username">>, + clientid => <<"api_clientid">>, + proto_ver => v5 + } + ), {ok, _} = emqtt:connect(Client), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + [{client, Client} | Config]; +t_publish_api({'end', Config}) -> + Client = ?config(client, Config), + emqtt:stop(Client), + ok; +t_publish_api(_) -> Payload = <<"hello">>, Path = emqx_mgmt_api_test_util:api_path(["publish"]), Auth = emqx_mgmt_api_test_util:auth_header_(), - Body = #{topic => ?TOPIC1, payload => Payload}, + UserProperties = #{<<"foo">> => <<"bar">>}, + Properties = + #{ + <<"payload_format_indicator">> => 0, + <<"message_expiry_interval">> => 1000, + <<"response_topic">> => ?TOPIC2, + <<"correlation_data">> => <<"some_correlation_id">>, + <<"user_properties">> => UserProperties, + <<"content_type">> => <<"application/json">> + }, + Body = #{topic => ?TOPIC1, payload => Payload, properties => Properties}, {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), ResponseMap = decode_json(Response), ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))), - ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), - emqtt:stop(Client). + {ok, Message} = receive_assert(?TOPIC1, 0, Payload), + RecvProperties = maps:get(properties, Message), + UserPropertiesList = maps:to_list(UserProperties), + #{ + 'Payload-Format-Indicator' := 0, + 'Message-Expiry-Interval' := RecvMessageExpiry, + 'Correlation-Data' := <<"some_correlation_id">>, + 'User-Property' := UserPropertiesList, + 'Content-Type' := <<"application/json">> + } = RecvProperties, + ?assert(RecvMessageExpiry =< 1000), + %% note: without props this time + Body2 = #{topic => ?TOPIC2, payload => Payload}, + {ok, Response2} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body2), + ResponseMap2 = decode_json(Response2), + ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap2))), + ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))). t_publish_no_subscriber({init, Config}) -> Config; @@ -163,16 +192,18 @@ t_publish_bad_topic_bulk(_Config) -> ). t_publish_bulk_api({init, Config}) -> - Config; -t_publish_bulk_api({'end', _Config}) -> - ok; -t_publish_bulk_api(_) -> {ok, Client} = emqtt:start_link(#{ username => <<"api_username">>, clientid => <<"api_clientid">> }), {ok, _} = emqtt:connect(Client), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + [{client, Client} | Config]; +t_publish_bulk_api({'end', Config}) -> + Client = ?config(client, Config), + emqtt:stop(Client), + ok; +t_publish_bulk_api(_) -> Payload = <<"hello">>, Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), Auth = emqx_mgmt_api_test_util:auth_header_(), @@ -199,9 +230,8 @@ t_publish_bulk_api(_) -> end, ResponseList ), - ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), - ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)), - emqtt:stop(Client). + ?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))), + ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))). t_publish_no_subscriber_bulk({init, Config}) -> Config; @@ -232,8 +262,8 @@ t_publish_no_subscriber_bulk(_) -> ], ResponseList ), - ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), - ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)), + ?assertEqual(ok, element(1, receive_assert(?TOPIC1, 0, Payload))), + ?assertEqual(ok, element(1, receive_assert(?TOPIC2, 0, Payload))), emqtt:stop(Client). t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) -> @@ -267,17 +297,19 @@ t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) - t_publish_bulk_dispatch_failure({init, Config}) -> meck:new(emqx, [no_link, passthrough, no_history]), meck:expect(emqx, is_running, fun() -> false end), - Config; -t_publish_bulk_dispatch_failure({'end', _Config}) -> - meck:unload(emqx), - ok; -t_publish_bulk_dispatch_failure(Config) when is_list(Config) -> {ok, Client} = emqtt:start_link(#{ username => <<"api_username">>, clientid => <<"api_clientid">> }), {ok, _} = emqtt:connect(Client), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + [{client, Client} | Config]; +t_publish_bulk_dispatch_failure({'end', Config}) -> + meck:unload(emqx), + Client = ?config(client, Config), + emqtt:stop(Client), + ok; +t_publish_bulk_dispatch_failure(Config) when is_list(Config) -> Payload = <<"hello">>, Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), Auth = emqx_mgmt_api_test_util:auth_header_(), @@ -303,8 +335,7 @@ t_publish_bulk_dispatch_failure(Config) when is_list(Config) -> #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS} ], decode_json(ResponseBody) - ), - emqtt:stop(Client). + ). receive_assert(Topic, Qos, Payload) -> receive @@ -312,12 +343,12 @@ receive_assert(Topic, Qos, Payload) -> ReceiveTopic = maps:get(topic, Message), ReceiveQos = maps:get(qos, Message), ReceivePayload = maps:get(payload, Message), - ?assertEqual(ReceiveTopic, Topic), - ?assertEqual(ReceiveQos, Qos), - ?assertEqual(ReceivePayload, Payload), - ok + ?assertEqual(Topic, ReceiveTopic), + ?assertEqual(Qos, ReceiveQos), + ?assertEqual(Payload, ReceivePayload), + {ok, Message} after 5000 -> - timeout + {error, timeout} end. decode_json(In) -> diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md index 6ad40c7fe..5329be024 100644 --- a/changes/v5.0.11-en.md +++ b/changes/v5.0.11-en.md @@ -25,6 +25,8 @@ - Add a new config `quick_deny_anonymous` to allow quick deny of anonymous clients (without username) so the auth backend checks can be skipped [#8516](https://github.com/emqx/emqx/pull/8516). +- Support message properties in `/publish` API [#9401](https://github.com/emqx/emqx/pull/9401). + ## Bug fixes - Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307). @@ -38,3 +40,5 @@ - Fix that `/configs/global_zone` API cannot get the default value of the configuration [#9392](https://github.com/emqx/emqx/pull/9392). - Fix mountpoint not working for will-msg [#9399](https://github.com/emqx/emqx/pull/9399). + +- Fix that the obsolete SSL files aren't deleted after the bridge configuration update [#9411](https://github.com/emqx/emqx/pull/9411). diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md index b5fe7c4bd..e0c82b022 100644 --- a/changes/v5.0.11-zh.md +++ b/changes/v5.0.11-zh.md @@ -23,6 +23,8 @@ - 添加了一个名为 `quick_deny_anonymous` 的新配置,用来在不调用认证链的情况下,快速的拒绝掉匿名用户,从而提高认证效率 [#8516](https://github.com/emqx/emqx/pull/8516)。 +- 支持在 /publish API 中添加消息属性 [#9401](https://github.com/emqx/emqx/pull/9401)。 + ## 修复 - 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。 @@ -36,3 +38,5 @@ - 修复 `/configs/global_zone` API 无法正确获取配置的默认值问题 [#9392](https://github.com/emqx/emqx/pull/9392)。 - 修复 mountpoint 配置未对遗嘱消息生效的问题 [#9399](https://github.com/emqx/emqx/pull/9399) + +- 修复桥接配置更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9411](https://github.com/emqx/emqx/pull/9411)。