From 6e12abff39189f4cb1eb82a7ac32b1620a64dda4 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 20 Apr 2023 17:58:12 +0800 Subject: [PATCH 01/33] fix(rocketmq): allow setting multiple addresses in RocketMQ bridge --- .../src/emqx_ee_connector_rocketmq.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 205359bb8..389e1e366 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -38,7 +38,7 @@ roots() -> fields(config) -> [ - {server, server()}, + {servers, servers()}, {topic, mk( binary(), @@ -75,7 +75,7 @@ add_default_fn(OrigFn, Default) -> (Field) -> OrigFn(Field) end. -server() -> +servers() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). @@ -97,7 +97,7 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{server := Server, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic} = Config1 ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -105,9 +105,8 @@ on_start( config => redact(Config1) }), Config = maps:merge(default_security_info(), Config1), - {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), - Server1 = [{Host, Port}], ClientId = client_id(InstanceId), ClientCfg = #{acl_info => #{}}, @@ -124,7 +123,7 @@ on_start( producers_opts => ProducerOpts }, - case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; {error, _Reason} = Error -> From d865998a63a6bf481148b00751d9b28a42f1d52e Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 11:02:14 +0800 Subject: [PATCH 02/33] fix(rocketmq): fix test cases --- lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl index 0cb14e5c3..33a83d2d8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -136,7 +136,7 @@ rocketmq_config(BridgeType, Config) -> io_lib:format( "bridges.~s.~s {\n" " enable = true\n" - " server = ~p\n" + " servers = ~p\n" " topic = ~p\n" " resource_opts = {\n" " request_timeout = 1500ms\n" From f602900a53fb547049f4c51b404be813d682f8be Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 13:35:29 +0800 Subject: [PATCH 03/33] fix(rocketmq): fix that the status check of RocketMQ bridge may not accurate --- .../emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 205359bb8..67f2d2562 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -154,12 +154,15 @@ on_batch_query(_InstanceId, Query, _State) -> on_get_status(_InstanceId, #{client_id := ClientId}) -> case rocketmq_client_sup:find_client(ClientId) of - {ok, _Pid} -> - connected; + {ok, Pid} -> + status_result(rocketmq_client:get_status(Pid)); _ -> connecting end. +status_result(_Status = true) -> connected; +status_result(_Status) -> connecting. + %%======================================================================================== %% Helper fns %%======================================================================================== From 7704995279935c99f74bbbbfa2e66e5d47df9262 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 15:10:25 +0800 Subject: [PATCH 04/33] fix(rocketmq): expose the driver parameter `sync_timeout` into the RocketMQ bridge configuration --- .../src/emqx_ee_connector_rocketmq.erl | 15 +++++++++++---- rel/i18n/emqx_ee_connector_rocketmq.hocon | 13 ++++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 285fa98b4..29f8ef84d 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -44,6 +44,11 @@ fields(config) -> binary(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, + {sync_timeout, + mk( + emqx_schema:duration(), + #{default => <<"3s">>, desc => ?DESC(sync_timeout)} + )}, {refresh_interval, mk( emqx_schema:duration(), @@ -76,7 +81,7 @@ add_default_fn(OrigFn, Default) -> end. servers() -> - Meta = #{desc => ?DESC("server")}, + Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). relational_fields() -> @@ -97,7 +102,7 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{servers := BinServers, topic := Topic} = Config1 + #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1 ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -116,8 +121,9 @@ on_start( ProducersMapPID = create_producers_map(ClientId), State = #{ client_id => ClientId, + topic => Topic, topic_tokens => TopicTks, - config => Config, + sync_timeout => SyncTimeout, templates => Templates, producers_map_pid => ProducersMapPID, producers_opts => ProducerOpts @@ -173,9 +179,10 @@ do_query( #{ templates := Templates, client_id := ClientId, + topic := RawTopic, topic_tokens := TopicTks, producers_opts := ProducerOpts, - config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} + sync_timeout := RequestTimeout } = State ) -> ?TRACE( diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 44dda7931..7f786898e 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -1,6 +1,6 @@ emqx_ee_connector_rocketmq { - server { + servers { desc { en: """The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
@@ -26,6 +26,17 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" } } + sync_timeout { + desc { + en: """Timeout of RocketMQ driver synchronous call.""" + zh: """RocketMQ 驱动同步调用的超时时间。""" + } + label: { + en: "Sync Timeout" + zh: "同步调用超时时间" + } + } + refresh_interval { desc { en: """RocketMQ Topic Route Refresh Interval.""" From c1000ccaedfc222e71c64c2c20befb6588f9b716 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 19 Apr 2023 16:42:54 +0800 Subject: [PATCH 05/33] fix: always check authn_http's header and ssl_option --- apps/emqx_authn/src/emqx_authn_schema.erl | 26 +++++ .../src/simple_authn/emqx_authn_http.erl | 52 ++++++--- .../test/emqx_authn_https_SUITE.erl | 17 +++ .../emqx_conf/test/emqx_conf_schema_tests.erl | 107 ++++++++++++++++++ 4 files changed, 186 insertions(+), 16 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn_schema.erl b/apps/emqx_authn/src/emqx_authn_schema.erl index 112ea2076..a7cdaac5f 100644 --- a/apps/emqx_authn/src/emqx_authn_schema.erl +++ b/apps/emqx_authn/src/emqx_authn_schema.erl @@ -18,10 +18,12 @@ -elvis([{elvis_style, invalid_dynamic_call, disable}]). -include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_authn.hrl"). -export([ common_fields/0, roots/0, + validations/0, tags/0, fields/1, authenticator_type/0, @@ -207,3 +209,27 @@ array(Name) -> array(Name, DescId) -> {Name, ?HOCON(?R_REF(Name), #{desc => ?DESC(DescId)})}. + +validations() -> + [ + {check_http_ssl_opts, fun(Conf) -> + CheckFun = fun emqx_authn_http:check_ssl_opts/1, + validation(Conf, CheckFun) + end}, + {check_http_headers, fun(Conf) -> + CheckFun = fun emqx_authn_http:check_headers/1, + validation(Conf, CheckFun) + end} + ]. + +validation(Conf, CheckFun) when is_map(Conf) -> + validation(hocon_maps:get(?CONF_NS, Conf), CheckFun); +validation(undefined, _) -> + ok; +validation([], _) -> + ok; +validation([AuthN | Tail], CheckFun) -> + case CheckFun(#{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY => AuthN}) of + ok -> validation(Tail, CheckFun); + Error -> Error + end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 3c34d878e..562cfdf6f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -38,6 +38,8 @@ headers/1 ]). +-export([check_headers/1, check_ssl_opts/1]). + -export([ refs/0, union_member_selector/1, @@ -106,8 +108,8 @@ common_fields() -> validations() -> [ - {check_ssl_opts, fun check_ssl_opts/1}, - {check_headers, fun check_headers/1} + {check_ssl_opts, fun ?MODULE:check_ssl_opts/1}, + {check_headers, fun ?MODULE:check_headers/1} ]. url(type) -> binary(); @@ -261,21 +263,39 @@ transform_header_name(Headers) -> ). check_ssl_opts(Conf) -> - {BaseUrl, _Path, _Query} = parse_url(get_conf_val("url", Conf)), - case BaseUrl of - <<"https://", _/binary>> -> - case get_conf_val("ssl.enable", Conf) of - true -> ok; - false -> false - end; - <<"http://", _/binary>> -> - ok + case get_conf_val("url", Conf) of + undefined -> + ok; + Url -> + {BaseUrl, _Path, _Query} = parse_url(Url), + case BaseUrl of + <<"https://", _/binary>> -> + case get_conf_val("ssl.enable", Conf) of + true -> + ok; + false -> + <<"it's required to enable the TLS option to establish a https connection">> + end; + <<"http://", _/binary>> -> + ok + end end. check_headers(Conf) -> - Method = to_bin(get_conf_val("method", Conf)), - Headers = get_conf_val("headers", Conf), - Method =:= <<"post">> orelse (not maps:is_key(<<"content-type">>, Headers)). + case get_conf_val("headers", Conf) of + undefined -> + ok; + Headers -> + case to_bin(get_conf_val("method", Conf)) of + <<"post">> -> + ok; + <<"get">> -> + case maps:is_key(<<"content-type">>, Headers) of + false -> ok; + true -> <<"HTTP GET requests cannot include content-type header.">> + end + end + end. parse_url(Url) -> case string:split(Url, "//", leading) of @@ -310,7 +330,7 @@ parse_config( method => Method, path => Path, headers => ensure_header_name_type(Headers), - base_path_templete => emqx_authn_utils:parse_str(Path), + base_path_template => emqx_authn_utils:parse_str(Path), base_query_template => emqx_authn_utils:parse_deep( cow_qs:parse_qs(to_bin(Query)) ), @@ -323,7 +343,7 @@ parse_config( generate_request(Credential, #{ method := Method, headers := Headers0, - base_path_templete := BasePathTemplate, + base_path_template := BasePathTemplate, base_query_template := BaseQueryTemplate, body_template := BodyTemplate }) -> diff --git a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl index c4315b69f..f23a160d1 100644 --- a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl @@ -114,6 +114,22 @@ t_create_invalid_version(_Config) -> emqx_access_control:authenticate(?CREDENTIALS) ). +t_create_disable_ssl_opts_when_https(_Config) -> + {ok, _} = create_https_auth_with_ssl_opts( + #{ + <<"server_name_indication">> => <<"authn-server">>, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.2">>], + <<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>], + <<"enable">> => <<"false">> + } + ), + + ?assertEqual( + {error, not_authorized}, + emqx_access_control:authenticate(?CREDENTIALS) + ). + t_create_invalid_ciphers(_Config) -> {ok, _} = create_https_auth_with_ssl_opts( #{ @@ -135,6 +151,7 @@ t_create_invalid_ciphers(_Config) -> create_https_auth_with_ssl_opts(SpecificSSLOpts) -> AuthConfig = raw_https_auth_config(SpecificSSLOpts), + ct:pal("111:~p~n", [AuthConfig]), emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}). raw_https_auth_config(SpecificSSLOpts) -> diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 3653b9d19..d6722299f 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -6,6 +6,113 @@ -include_lib("eunit/include/eunit.hrl"). +-define(BASE_CONF, + "" + "\n" + " node {\n" + " name = \"emqx1@127.0.0.1\"\n" + " cookie = \"emqxsecretcookie\"\n" + " data_dir = \"data\"\n" + " }\n" + " cluster {\n" + " name = emqxcl\n" + " discovery_strategy = static\n" + " static.seeds = ~p\n" + " core_nodes = ~p\n" + " }\n" + "" +). +array_nodes_test() -> + ExpectNodes = ['emqx1@127.0.0.1', 'emqx2@127.0.0.1'], + lists:foreach( + fun(Nodes) -> + ConfFile = iolist_to_binary(io_lib:format(?BASE_CONF, [Nodes, Nodes])), + {ok, Conf} = hocon:binary(ConfFile, #{format => richmap}), + ConfList = hocon_tconf:generate(emqx_conf_schema, Conf), + ClusterDiscovery = proplists:get_value( + cluster_discovery, proplists:get_value(ekka, ConfList) + ), + ?assertEqual( + {static, [{seeds, ExpectNodes}]}, + ClusterDiscovery, + Nodes + ), + ?assertEqual( + ExpectNodes, + proplists:get_value(core_nodes, proplists:get_value(mria, ConfList)), + Nodes + ) + end, + [["emqx1@127.0.0.1", "emqx2@127.0.0.1"], "emqx1@127.0.0.1, emqx2@127.0.0.1"] + ), + ok. + +authn_validations_test() -> + BaseConf = iolist_to_binary(io_lib:format(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"])), + DisableSSLWithHttps = + "" + "\n" + "authentication = [\n" + "{backend = \"http\"\n" + "body {password = \"${password}\", username = \"${username}\"}\n" + "connect_timeout = \"15s\"\n" + "enable_pipelining = 100\n" + "headers {\"content-type\" = \"application/json\"}\n" + "mechanism = \"password_based\"\n" + "method = \"post\"\n" + "pool_size = 8\n" + "request_timeout = \"5s\"\n" + "ssl {enable = false, verify = \"verify_peer\"}\n" + "url = \"https://127.0.0.1:8080\"\n" + "}\n" + "]\n" + "", + Conf = <>, + {ok, ConfMap} = hocon:binary(Conf, #{format => richmap}), + ?assertThrow( + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := integrity_validation_failure, + result := _, + validation_name := check_http_ssl_opts + } + ]}, + hocon_tconf:generate(emqx_conf_schema, ConfMap) + ), + BadHeader = + "" + "\n" + "authentication = [\n" + "{backend = \"http\"\n" + "body {password = \"${password}\", username = \"${username}\"}\n" + "connect_timeout = \"15s\"\n" + "enable_pipelining = 100\n" + "headers {\"content-type\" = \"application/json\"}\n" + "mechanism = \"password_based\"\n" + "method = \"get\"\n" + "pool_size = 8\n" + "request_timeout = \"5s\"\n" + "ssl {enable = false, verify = \"verify_peer\"}\n" + "url = \"http://127.0.0.1:8080\"\n" + "}\n" + "]\n" + "", + Conf1 = <>, + {ok, ConfMap1} = hocon:binary(Conf1, #{format => richmap}), + ?assertThrow( + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := integrity_validation_failure, + result := _, + validation_name := check_http_headers + } + ]}, + hocon_tconf:generate(emqx_conf_schema, ConfMap1) + ), + ok. + doc_gen_test() -> %% the json file too large to encode. { From 397e28f5a4cbd692ffe6b0bb95dd89a401988d65 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 19 Apr 2023 17:10:07 +0800 Subject: [PATCH 06/33] chore: add changlog for authn_http validation --- apps/emqx_authn/test/emqx_authn_https_SUITE.erl | 17 ----------------- changes/ce/fix-10449.en.md | 2 ++ 2 files changed, 2 insertions(+), 17 deletions(-) create mode 100644 changes/ce/fix-10449.en.md diff --git a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl index f23a160d1..c4315b69f 100644 --- a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl @@ -114,22 +114,6 @@ t_create_invalid_version(_Config) -> emqx_access_control:authenticate(?CREDENTIALS) ). -t_create_disable_ssl_opts_when_https(_Config) -> - {ok, _} = create_https_auth_with_ssl_opts( - #{ - <<"server_name_indication">> => <<"authn-server">>, - <<"verify">> => <<"verify_peer">>, - <<"versions">> => [<<"tlsv1.2">>], - <<"ciphers">> => [<<"ECDHE-RSA-AES256-GCM-SHA384">>], - <<"enable">> => <<"false">> - } - ), - - ?assertEqual( - {error, not_authorized}, - emqx_access_control:authenticate(?CREDENTIALS) - ). - t_create_invalid_ciphers(_Config) -> {ok, _} = create_https_auth_with_ssl_opts( #{ @@ -151,7 +135,6 @@ t_create_invalid_ciphers(_Config) -> create_https_auth_with_ssl_opts(SpecificSSLOpts) -> AuthConfig = raw_https_auth_config(SpecificSSLOpts), - ct:pal("111:~p~n", [AuthConfig]), emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}). raw_https_auth_config(SpecificSSLOpts) -> diff --git a/changes/ce/fix-10449.en.md b/changes/ce/fix-10449.en.md new file mode 100644 index 000000000..e10b52fb4 --- /dev/null +++ b/changes/ce/fix-10449.en.md @@ -0,0 +1,2 @@ +Validate the ssl_options and header configurations when creating authentication http (`authn_http`). +Prior to this, incorrect ssl_options configuration could result in successful creation but the entire authn being unusable. From dc92b4f63f793e866374ce59730431d88d2430f6 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Thu, 20 Apr 2023 18:20:51 +0800 Subject: [PATCH 07/33] test: add a test for authn {} --- .../emqx_conf/test/emqx_conf_schema_tests.erl | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index d6722299f..fd2bd4dcd 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -111,6 +111,37 @@ authn_validations_test() -> ]}, hocon_tconf:generate(emqx_conf_schema, ConfMap1) ), + BadHeader2 = + "" + "\n" + "authentication = \n" + "{backend = \"http\"\n" + "body {password = \"${password}\", username = \"${username}\"}\n" + "connect_timeout = \"15s\"\n" + "enable_pipelining = 100\n" + "headers {\"content-type\" = \"application/json\"}\n" + "mechanism = \"password_based\"\n" + "method = \"get\"\n" + "pool_size = 8\n" + "request_timeout = \"5s\"\n" + "ssl {enable = false, verify = \"verify_peer\"}\n" + "url = \"http://127.0.0.1:8080\"\n" + "}\n" + "\n" + "", + Conf2 = <>, + {ok, ConfMap2} = hocon:binary(Conf2, #{format => richmap}), + ?assertThrow( + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := integrity_validation_failure, + result := _, + validation_name := check_http_headers + } + ]}, + hocon_tconf:generate(emqx_conf_schema, ConfMap2) + ), ok. doc_gen_test() -> From f831a0b8277ce9c6cec25679fe41057bd82d09d2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 21 Apr 2023 11:21:05 +0800 Subject: [PATCH 08/33] chore: update changes/ce/fix-10449.en.md Co-authored-by: JianBo He --- changes/ce/fix-10449.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-10449.en.md b/changes/ce/fix-10449.en.md index e10b52fb4..005dea73c 100644 --- a/changes/ce/fix-10449.en.md +++ b/changes/ce/fix-10449.en.md @@ -1,2 +1,2 @@ Validate the ssl_options and header configurations when creating authentication http (`authn_http`). -Prior to this, incorrect ssl_options configuration could result in successful creation but the entire authn being unusable. +Prior to this, incorrect `ssl` configuration could result in successful creation but the entire authn being unusable. From 1db38de71f44693b3294ed61797b7335f12e06bf Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Fri, 21 Apr 2023 11:54:11 +0800 Subject: [PATCH 09/33] chore: apply review suggestions --- .../src/simple_authn/emqx_authn_http.erl | 28 ++- .../emqx_conf/test/emqx_conf_schema_tests.erl | 185 ++++++++---------- apps/emqx_prometheus/TODO | 2 - 3 files changed, 98 insertions(+), 117 deletions(-) delete mode 100644 apps/emqx_prometheus/TODO diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 562cfdf6f..43701cbc7 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -263,10 +263,9 @@ transform_header_name(Headers) -> ). check_ssl_opts(Conf) -> - case get_conf_val("url", Conf) of - undefined -> - ok; - Url -> + case is_backend_http(Conf) of + true -> + Url = get_conf_val("url", Conf), {BaseUrl, _Path, _Query} = parse_url(Url), case BaseUrl of <<"https://", _/binary>> -> @@ -278,14 +277,15 @@ check_ssl_opts(Conf) -> end; <<"http://", _/binary>> -> ok - end + end; + false -> + ok end. check_headers(Conf) -> - case get_conf_val("headers", Conf) of - undefined -> - ok; - Headers -> + case is_backend_http(Conf) of + true -> + Headers = get_conf_val("headers", Conf), case to_bin(get_conf_val("method", Conf)) of <<"post">> -> ok; @@ -294,7 +294,15 @@ check_headers(Conf) -> false -> ok; true -> <<"HTTP GET requests cannot include content-type header.">> end - end + end; + false -> + ok + end. + +is_backend_http(Conf) -> + case get_conf_val("backend", Conf) of + http -> true; + _ -> false end. parse_url(Url) -> diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index fd2bd4dcd..667d1766f 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -6,27 +6,27 @@ -include_lib("eunit/include/eunit.hrl"). +%% erlfmt-ignore -define(BASE_CONF, - "" - "\n" - " node {\n" - " name = \"emqx1@127.0.0.1\"\n" - " cookie = \"emqxsecretcookie\"\n" - " data_dir = \"data\"\n" - " }\n" - " cluster {\n" - " name = emqxcl\n" - " discovery_strategy = static\n" - " static.seeds = ~p\n" - " core_nodes = ~p\n" - " }\n" - "" -). + """ + node { + name = \"emqx1@127.0.0.1\" + cookie = \"emqxsecretcookie\" + data_dir = \"data\" + } + cluster { + name = emqxcl + discovery_strategy = static + static.seeds = ~p + core_nodes = ~p + } + """). + array_nodes_test() -> ExpectNodes = ['emqx1@127.0.0.1', 'emqx2@127.0.0.1'], lists:foreach( fun(Nodes) -> - ConfFile = iolist_to_binary(io_lib:format(?BASE_CONF, [Nodes, Nodes])), + ConfFile = to_bin(?BASE_CONF, [Nodes, Nodes]), {ok, Conf} = hocon:binary(ConfFile, #{format => richmap}), ConfList = hocon_tconf:generate(emqx_conf_schema, Conf), ClusterDiscovery = proplists:get_value( @@ -47,101 +47,73 @@ array_nodes_test() -> ), ok. +%% erlfmt-ignore +-define(BASE_AUTHN_ARRAY, + """ + authentication = [ + {backend = \"http\" + body {password = \"${password}\", username = \"${username}\"} + connect_timeout = \"15s\" + enable_pipelining = 100 + headers {\"content-type\" = \"application/json\"} + mechanism = \"password_based\" + method = \"~p\" + pool_size = 8 + request_timeout = \"5s\" + ssl {enable = ~p, verify = \"verify_peer\"} + url = \"~ts\" + } + ] + """ +). + +-define(ERROR(Reason), + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := integrity_validation_failure, + result := _, + validation_name := Reason + } + ]} +). + authn_validations_test() -> - BaseConf = iolist_to_binary(io_lib:format(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"])), - DisableSSLWithHttps = - "" - "\n" - "authentication = [\n" - "{backend = \"http\"\n" - "body {password = \"${password}\", username = \"${username}\"}\n" - "connect_timeout = \"15s\"\n" - "enable_pipelining = 100\n" - "headers {\"content-type\" = \"application/json\"}\n" - "mechanism = \"password_based\"\n" - "method = \"post\"\n" - "pool_size = 8\n" - "request_timeout = \"5s\"\n" - "ssl {enable = false, verify = \"verify_peer\"}\n" - "url = \"https://127.0.0.1:8080\"\n" - "}\n" - "]\n" - "", - Conf = <>, - {ok, ConfMap} = hocon:binary(Conf, #{format => richmap}), - ?assertThrow( - {emqx_conf_schema, [ - #{ - kind := validation_error, - reason := integrity_validation_failure, - result := _, - validation_name := check_http_ssl_opts - } - ]}, - hocon_tconf:generate(emqx_conf_schema, ConfMap) - ), - BadHeader = - "" - "\n" - "authentication = [\n" - "{backend = \"http\"\n" - "body {password = \"${password}\", username = \"${username}\"}\n" - "connect_timeout = \"15s\"\n" - "enable_pipelining = 100\n" - "headers {\"content-type\" = \"application/json\"}\n" - "mechanism = \"password_based\"\n" - "method = \"get\"\n" - "pool_size = 8\n" - "request_timeout = \"5s\"\n" - "ssl {enable = false, verify = \"verify_peer\"}\n" - "url = \"http://127.0.0.1:8080\"\n" - "}\n" - "]\n" - "", - Conf1 = <>, + BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]), + + OKHttps = to_bin(?BASE_AUTHN_ARRAY, [post, true, <<"https://127.0.0.1:8080">>]), + Conf0 = <>, + {ok, ConfMap0} = hocon:binary(Conf0, #{format => richmap}), + ?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap0))), + + OKHttp = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"http://127.0.0.1:8080">>]), + Conf1 = <>, {ok, ConfMap1} = hocon:binary(Conf1, #{format => richmap}), - ?assertThrow( - {emqx_conf_schema, [ - #{ - kind := validation_error, - reason := integrity_validation_failure, - result := _, - validation_name := check_http_headers - } - ]}, - hocon_tconf:generate(emqx_conf_schema, ConfMap1) - ), - BadHeader2 = - "" - "\n" - "authentication = \n" - "{backend = \"http\"\n" - "body {password = \"${password}\", username = \"${username}\"}\n" - "connect_timeout = \"15s\"\n" - "enable_pipelining = 100\n" - "headers {\"content-type\" = \"application/json\"}\n" - "mechanism = \"password_based\"\n" - "method = \"get\"\n" - "pool_size = 8\n" - "request_timeout = \"5s\"\n" - "ssl {enable = false, verify = \"verify_peer\"}\n" - "url = \"http://127.0.0.1:8080\"\n" - "}\n" - "\n" - "", - Conf2 = <>, + ?assert(is_list(hocon_tconf:generate(emqx_conf_schema, ConfMap1))), + + DisableSSLWithHttps = to_bin(?BASE_AUTHN_ARRAY, [post, false, <<"https://127.0.0.1:8080">>]), + Conf2 = <>, {ok, ConfMap2} = hocon:binary(Conf2, #{format => richmap}), ?assertThrow( - {emqx_conf_schema, [ - #{ - kind := validation_error, - reason := integrity_validation_failure, - result := _, - validation_name := check_http_headers - } - ]}, + ?ERROR(check_http_ssl_opts), hocon_tconf:generate(emqx_conf_schema, ConfMap2) ), + + BadHeader = to_bin(?BASE_AUTHN_ARRAY, [get, true, <<"https://127.0.0.1:8080">>]), + Conf3 = <>, + {ok, ConfMap3} = hocon:binary(Conf3, #{format => richmap}), + ?assertThrow( + ?ERROR(check_http_headers), + hocon_tconf:generate(emqx_conf_schema, ConfMap3) + ), + + BadHeaderWithTuple = binary:replace(BadHeader, [<<"[">>, <<"]">>], <<"">>, [global]), + Conf4 = <>, + {ok, ConfMap4} = hocon:binary(Conf4, #{format => richmap}), + ?assertThrow( + ?ERROR(check_http_headers), + hocon_tconf:generate(emqx_conf_schema, ConfMap4) + ), ok. doc_gen_test() -> @@ -164,3 +136,6 @@ doc_gen_test() -> ok end }. + +to_bin(Format, Args) -> + iolist_to_binary(io_lib:format(Format, Args)). diff --git a/apps/emqx_prometheus/TODO b/apps/emqx_prometheus/TODO deleted file mode 100644 index a868fba7e..000000000 --- a/apps/emqx_prometheus/TODO +++ /dev/null @@ -1,2 +0,0 @@ -1. Add more VM Metrics -2. Add more emqx Metrics From ffa60d0aa49db0ebbc7d0405ed82b7b7d1fb1044 Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Fri, 21 Apr 2023 19:26:07 +0800 Subject: [PATCH 10/33] chore: upgrade dashboard to e1.0.6-beta.1 for ee --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7c0a133f4..a941af3b8 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh) export EMQX_DASHBOARD_VERSION ?= v1.2.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6-beta.2 export EMQX_REL_FORM ?= tgz export QUICER_DOWNLOAD_FROM_RELEASE = 1 ifeq ($(OS),Windows_NT) From df31a8a3424b38794fae620a6db459afb72651b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Fri, 21 Apr 2023 20:29:14 +0800 Subject: [PATCH 11/33] test: conf_schema_tests failed --- apps/emqx_conf/test/emqx_conf_schema_tests.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 667d1766f..192aa0e93 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -25,8 +25,8 @@ array_nodes_test() -> ExpectNodes = ['emqx1@127.0.0.1', 'emqx2@127.0.0.1'], lists:foreach( - fun(Nodes) -> - ConfFile = to_bin(?BASE_CONF, [Nodes, Nodes]), + fun({Seeds, Nodes}) -> + ConfFile = to_bin(?BASE_CONF, [Seeds, Nodes]), {ok, Conf} = hocon:binary(ConfFile, #{format => richmap}), ConfList = hocon_tconf:generate(emqx_conf_schema, Conf), ClusterDiscovery = proplists:get_value( @@ -43,7 +43,7 @@ array_nodes_test() -> Nodes ) end, - [["emqx1@127.0.0.1", "emqx2@127.0.0.1"], "emqx1@127.0.0.1, emqx2@127.0.0.1"] + [{["emqx1@127.0.0.1", "emqx2@127.0.0.1"], "emqx1@127.0.0.1, emqx2@127.0.0.1"}] ), ok. @@ -79,7 +79,7 @@ array_nodes_test() -> ). authn_validations_test() -> - BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]), + BaseConf = to_bin(?BASE_CONF, [["emqx1@127.0.0.1"], "emqx1@127.0.0.1,emqx1@127.0.0.1"]), OKHttps = to_bin(?BASE_AUTHN_ARRAY, [post, true, <<"https://127.0.0.1:8080">>]), Conf0 = <>, From 6beb9e00b6fe83a82a477e5eaef50ce789bb0724 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Sat, 22 Apr 2023 20:09:40 +0200 Subject: [PATCH 12/33] chore: e5.0.3-alpha.2 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index ea79dcd0e..d1f1a93ae 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.22"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.3-alpha.1"). +-define(EMQX_RELEASE_EE, "5.0.3-alpha.2"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). From d826b0921dba154007683940577a4a35ea00a350 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 21 Apr 2023 18:16:40 +0800 Subject: [PATCH 13/33] fix(dynamo): separate the implementation of connector and client of Dynamo bridge --- .../test/emqx_ee_bridge_dynamo_SUITE.erl | 2 +- .../src/emqx_ee_connector_dynamo.erl | 129 ++----------- .../src/emqx_ee_connector_dynamo_client.erl | 180 ++++++++++++++++++ 3 files changed, 196 insertions(+), 115 deletions(-) create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 9cf7eb8f4..3b07acbe0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -251,7 +251,7 @@ directly_setup_dynamo() -> directly_query(Query) -> directly_setup_dynamo(), - emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN). + emqx_ee_connector_dynamo_client:execute(Query, ?TABLE_BIN). directly_get_payload(Key) -> case directly_query({get_item, {<<"id">>, Key}}) of diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 9a149b6f7..2170827d6 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -28,10 +28,7 @@ ]). -export([ - connect/1, - do_get_status/1, - do_async_reply/2, - worker_do_query/4 + connect/1 ]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -40,10 +37,6 @@ default_port => 8000 }). --ifdef(TEST). --export([execute/2]). --endif. - %%===================================================================== %% Hocon schema roots() -> @@ -126,45 +119,39 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstanceId, Query, State) -> - do_query(InstanceId, Query, handover, State). + do_query(InstanceId, Query, sync, State). -on_query_async(InstanceId, Query, Reply, State) -> +on_query_async(InstanceId, Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ). %% we only support batch insert on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> - do_query(InstanceId, Query, handover, State); + do_query(InstanceId, Query, sync, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> +on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ); on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. on_get_status(_InstanceId, #{poolname := Pool}) -> - Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), + Health = emqx_plugin_libs_pool:health_check_ecpool_workers( + Pool, {emqx_ee_connector_dynamo_client, is_connected, []} + ), status_result(Health). -do_get_status(_Conn) -> - %% because the dynamodb driver connection process is the ecpool worker self - %% so we must call the checker function inside the worker - case erlcloud_ddb2:list_tables() of - {ok, _} -> true; - _ -> false - end. - status_result(_Status = true) -> connected; status_result(_Status = false) -> connecting. @@ -185,8 +172,8 @@ do_query( ), Result = ecpool:pick_and_do( PoolName, - {?MODULE, worker_do_query, [Table, Query, Templates]}, - ApplyMode + {emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]}, + no_handover ), case Result of @@ -210,47 +197,10 @@ do_query( Result end. -worker_do_query(_Client, Table, Query0, Templates) -> - try - Query = apply_template(Query0, Templates), - execute(Query, Table) - catch - _Type:Reason -> - {error, {unrecoverable_error, {invalid_request, Reason}}} - end. - -%% some simple query commands for authn/authz or test -execute({insert_item, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute({delete_item, Key}, Table) -> - erlcloud_ddb2:delete_item(Table, Key); -execute({get_item, Key}, Table) -> - erlcloud_ddb2:get_item(Table, Key); -%% commands for data bridge query or batch query -execute({send_message, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute([{put, _} | _] = Msgs, Table) -> - %% type of batch_write_item argument :: batch_write_item_request_items() - %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) - %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} - %% batch_write_item_request() :: {put, item()} | {delete, key()} - erlcloud_ddb2:batch_write_item({Table, Msgs}). - connect(Opts) -> - #{ - aws_access_key_id := AccessKeyID, - aws_secret_access_key := SecretAccessKey, - host := Host, - port := Port, - schema := Schema - } = proplists:get_value(config, Opts), - erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), - - %% The dynamodb driver uses caller process as its connection process - %% so at here, the connection process is the ecpool worker self - {ok, self()}. + Options = proplists:get_value(config, Opts), + {ok, _Pid} = Result = emqx_ee_connector_dynamo_client:start_link(Options), + Result. parse_template(Config) -> Templates = @@ -283,54 +233,5 @@ get_host_schema("https://" ++ Server) -> get_host_schema(Server) -> {"http://", Server}. -apply_template({Key, Msg} = Req, Templates) -> - case maps:get(Key, Templates, undefined) of - undefined -> - Req; - Template -> - {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} - end; -%% now there is no batch delete, so -%% 1. we can simply replace the `send_message` to `put` -%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, -%% so we can reduce some list map cost -apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> - lists:map( - fun(Req) -> - {_, Msg} = apply_template(Req, Templates), - {put, convert_to_item(Msg)} - end, - Msgs - ). - -convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> - maps:fold( - fun - (_K, <<>>, AccIn) -> - AccIn; - (K, V, AccIn) -> - [{convert2binary(K), convert2binary(V)} | AccIn] - end, - [], - Msg - ); -convert_to_item(MsgBin) when is_binary(MsgBin) -> - Msg = emqx_utils_json:decode(MsgBin), - convert_to_item(Msg); -convert_to_item(Item) -> - erlang:throw({invalid_item, Item}). - -convert2binary(Value) when is_atom(Value) -> - erlang:atom_to_binary(Value, utf8); -convert2binary(Value) when is_binary(Value); is_number(Value) -> - Value; -convert2binary(Value) when is_list(Value) -> - unicode:characters_to_binary(Value); -convert2binary(Value) when is_map(Value) -> - emqx_utils_json:encode(Value). - -do_async_reply(Result, {ReplyFun, [Context]}) -> - ReplyFun(Context, Result). - redact(Data) -> emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl new file mode 100644 index 000000000..e0d8ee4bf --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_dynamo_client). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + is_connected/1, + query/5, + query/4 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%%=================================================================== +%%% API +%%%=================================================================== +is_connected(Pid) -> + try + gen_server:call(Pid, is_connected) + catch + _:_ -> + false + end. + +query(Pid, sync, Table, Query, Templates) -> + query(Pid, Table, Query, Templates); +query(Pid, {async, ReplyCtx}, Table, Query, Templates) -> + gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}). + +query(Pid, Table, Query, Templates) -> + gen_server:call(Pid, {query, Table, Query, Templates}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts Bridge which transfer data to DynamoDB +%% @endn +%%-------------------------------------------------------------------- +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% Initialize dynamodb data bridge +init(#{ + aws_access_key_id := AccessKeyID, + aws_secret_access_key := SecretAccessKey, + host := Host, + port := Port, + schema := Schema +}) -> + erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), + {ok, #{}}. + +handle_call(is_connected, _From, State) -> + _ = erlcloud_ddb2:list_tables(), + {reply, true, State}; +handle_call({query, Table, Query, Templates}, _From, State) -> + Result = do_query(Table, Query, Templates), + {reply, Result, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) -> + Result = do_query(Table, Query, Templates), + ReplyFun(Context, Result), + {noreply, State}; +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +do_query(Table, Query0, Templates) -> + try + Query = apply_template(Query0, Templates), + execute(Query, Table) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +%% some simple query commands for authn/authz or test +execute({insert_item, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute({delete_item, Key}, Table) -> + erlcloud_ddb2:delete_item(Table, Key); +execute({get_item, Key}, Table) -> + erlcloud_ddb2:get_item(Table, Key); +%% commands for data bridge query or batch query +execute({send_message, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute([{put, _} | _] = Msgs, Table) -> + %% type of batch_write_item argument :: batch_write_item_request_items() + %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) + %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} + %% batch_write_item_request() :: {put, item()} | {delete, key()} + erlcloud_ddb2:batch_write_item({Table, Msgs}). + +apply_template({Key, Msg} = Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + Req; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% now there is no batch delete, so +%% 1. we can simply replace the `send_message` to `put` +%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, +%% so we can reduce some list map cost +apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> + lists:map( + fun(Req) -> + {_, Msg} = apply_template(Req, Templates), + {put, convert_to_item(Msg)} + end, + Msgs + ). + +convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> + maps:fold( + fun + (_K, <<>>, AccIn) -> + AccIn; + (K, V, AccIn) -> + [{convert2binary(K), convert2binary(V)} | AccIn] + end, + [], + Msg + ); +convert_to_item(MsgBin) when is_binary(MsgBin) -> + Msg = emqx_utils_json:decode(MsgBin), + convert_to_item(Msg); +convert_to_item(Item) -> + erlang:throw({invalid_item, Item}). + +convert2binary(Value) when is_atom(Value) -> + erlang:atom_to_binary(Value, utf8); +convert2binary(Value) when is_binary(Value); is_number(Value) -> + Value; +convert2binary(Value) when is_list(Value) -> + unicode:characters_to_binary(Value); +convert2binary(Value) when is_map(Value) -> + emqx_utils_json:encode(Value). From 5ab08876873d40908e034be609a70afe786362a7 Mon Sep 17 00:00:00 2001 From: firest Date: Sun, 23 Apr 2023 15:17:51 +0800 Subject: [PATCH 14/33] chore: add examples for RocketMQ template --- rel/i18n/emqx_ee_bridge_rocketmq.hocon | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rel/i18n/emqx_ee_bridge_rocketmq.hocon b/rel/i18n/emqx_ee_bridge_rocketmq.hocon index 2e33e6c07..0e5b64bb1 100644 --- a/rel/i18n/emqx_ee_bridge_rocketmq.hocon +++ b/rel/i18n/emqx_ee_bridge_rocketmq.hocon @@ -16,8 +16,14 @@ NOTE: if the bridge is used as a rule action, `local_topic` should be left empty template { desc { - en: """Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ""" - zh: """模板, 默认为空,为空时将会将整个消息转发给 RocketMQ""" + en: """Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ.
+ The template can be any valid string with placeholders, example:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {\"id\" : ${id}, \"username\" : ${username}}""" + zh: """模板, 默认为空,为空时将会将整个消息转发给 RocketMQ。
+ 模板可以是任意带有占位符的合法字符串, 例如:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {\"id\" : ${id}, \"username\" : ${username}}""" } label { en: "Template" From 7d2c336ab7427e6b909b66f2e9f1452c3ef3baf3 Mon Sep 17 00:00:00 2001 From: firest Date: Sun, 23 Apr 2023 15:31:08 +0800 Subject: [PATCH 15/33] fix(resource): make sure resource will not crash when stopping --- apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index a00dcdcd2..ae30c3927 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -141,9 +141,5 @@ ensure_disk_queue_dir_absent(ResourceId, Index) -> ok. ensure_worker_pool_removed(ResId) -> - try - gproc_pool:delete(ResId) - catch - error:badarg -> ok - end, + gproc_pool:force_delete(ResId), ok. From 4e7472090ba69208be2f5b462f3835ed1bfaf9b5 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 18 Apr 2023 10:37:05 +0800 Subject: [PATCH 16/33] fix: refine default sql and driver name for mssql bridge --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl | 6 +++--- scripts/ct/run.sh | 4 ++-- .../{install-odbc-driver.sh => install-msodbc-driver.sh} | 0 3 files changed, 5 insertions(+), 5 deletions(-) rename scripts/{install-odbc-driver.sh => install-msodbc-driver.sh} (100%) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl index e216299c2..49db815a6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl @@ -22,11 +22,11 @@ ]). -define(DEFAULT_SQL, << - "insert into t_mqtt_msg(msgid, topic, qos, payload)" - "values (${id}, ${topic}, ${qos}, ${payload})" + "insert into t_mqtt_msg(msgid, topic, qos, payload) " + "values ( ${id}, ${topic}, ${qos}, ${payload} )" >>). --define(DEFAULT_DRIVER, <<"ms-sqlserver-18">>). +-define(DEFAULT_DRIVER, <<"ms-sql">>). conn_bridge_examples(Method) -> [ diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 4e79476e0..a85aa36af 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -193,9 +193,9 @@ for dep in ${CT_DEPS}; do done if [ "$ODBC_REQUEST" = 'yes' ]; then - INSTALL_ODBC="./scripts/install-odbc-driver.sh" + INSTALL_ODBC="./scripts/install-msodbc-driver.sh" else - INSTALL_ODBC="echo 'Driver msodbcsql driver not requested'" + INSTALL_ODBC="echo 'msodbc driver not requested'" fi F_OPTIONS="" diff --git a/scripts/install-odbc-driver.sh b/scripts/install-msodbc-driver.sh similarity index 100% rename from scripts/install-odbc-driver.sh rename to scripts/install-msodbc-driver.sh From d505b65ba81534e6cfcf2c091fb32435270ad459 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 23 Apr 2023 15:45:58 +0800 Subject: [PATCH 17/33] fix: use default health check timeout for sqlserver --- .../src/emqx_ee_connector_sqlserver.erl | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index f11441a3b..6cbd9de4e 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -43,7 +43,7 @@ -export([connect/1]). %% Internal exports used to execute code with ecpool worker --export([do_get_status/2, worker_do_insert/3, do_async_reply/2]). +-export([do_get_status/1, worker_do_insert/3, do_async_reply/2]). -import(emqx_plugin_libs_rule, [str/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -306,10 +306,9 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ), do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) -> - RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), +on_get_status(_InstanceId, #{poolname := Pool} = _State) -> Health = emqx_plugin_libs_pool:health_check_ecpool_workers( - Pool, {?MODULE, do_get_status, [RequestTimeout]}, RequestTimeout + Pool, {?MODULE, do_get_status, []} ), status_result(Health). @@ -328,9 +327,9 @@ connect(Options) -> Opts = proplists:get_value(options, Options, []), odbc:connect(ConnectStr, Opts). --spec do_get_status(connection_reference(), time_out()) -> Result :: boolean(). -do_get_status(Conn, RequestTimeout) -> - case execute(Conn, <<"SELECT 1">>, RequestTimeout) of +-spec do_get_status(connection_reference()) -> Result :: boolean(). +do_get_status(Conn) -> + case execute(Conn, <<"SELECT 1">>) of {selected, [[]], [{1}]} -> true; _ -> false end. @@ -444,6 +443,15 @@ worker_do_insert( {error, {unrecoverable_error, {invalid_request, Reason}}} end. +-spec execute(pid(), sql()) -> + updated_tuple() + | selected_tuple() + | [updated_tuple()] + | [selected_tuple()] + | {error, common_reason()}. +execute(Conn, SQL) -> + odbc:sql_query(Conn, str(SQL)). + -spec execute(pid(), sql(), time_out()) -> updated_tuple() | selected_tuple() From 5593e38ed306b941d3214b96ca067ccdacad6d95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 23 Apr 2023 15:43:18 +0800 Subject: [PATCH 18/33] fix: copy cluster-override.conf from old version --- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_conf/src/emqx_conf_app.erl | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 234690374..03cd36522 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.17"}, + {vsn, "0.1.18"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 35a79ea6e..9d6bb35d7 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -175,7 +175,7 @@ copy_override_conf_from_core_node() -> _ -> [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, - HasDeprecatedFile = maps:get(has_deprecated_file, Info, false), + HasDeprecatedFile = has_deprecated_file(Info), ?SLOG(debug, #{ msg => "copy_cluster_conf_from_core_node_success", node => Node, @@ -227,3 +227,16 @@ sync_data_from_node(Node) -> ?SLOG(emergency, #{node => Node, msg => "sync_data_from_node_failed", reason => Error}), error(Error) end. + +has_deprecated_file(#{node := Node} = Info) -> + case maps:find(has_deprecated_file, Info) of + {ok, HasDeprecatedFile} -> + HasDeprecatedFile; + error -> + %% The old version don't have emqx_config:has_deprecated_file/0 + Timeout = 5000, + {ok, File} = rpc:call( + Node, application, get_env, [emqx, cluster_override_conf_file], Timeout + ), + rpc:call(Node, filelib, is_regular, [File], Timeout) + end. From e0fd861863835de4e4df7cb49f8c673fd78d2aa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 23 Apr 2023 17:20:54 +0800 Subject: [PATCH 19/33] chore: make static_check happy --- apps/emqx_conf/src/emqx_conf_app.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 9d6bb35d7..f7ce797b8 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -234,9 +234,7 @@ has_deprecated_file(#{node := Node} = Info) -> HasDeprecatedFile; error -> %% The old version don't have emqx_config:has_deprecated_file/0 - Timeout = 5000, - {ok, File} = rpc:call( - Node, application, get_env, [emqx, cluster_override_conf_file], Timeout - ), - rpc:call(Node, filelib, is_regular, [File], Timeout) + DataDir = emqx_conf_proto_v2:get_config(Node, [node, data_dir]), + File = filename:join([DataDir, "configs", "cluster-override.conf"]), + rpc:call(Node, filelib, is_regular, [File], 5000) end. From f96c1630e1271fac4c34cbf08951984fd7360beb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 23 Apr 2023 17:30:07 +0800 Subject: [PATCH 20/33] chore: pin emqx_conf to 0.17.0 --- apps/emqx_conf/src/emqx_conf.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 03cd36522..234690374 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.18"}, + {vsn, "0.1.17"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, From b4c16d37c74ce90bf29a4137a3f98781c761f3d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 23 Apr 2023 17:20:54 +0800 Subject: [PATCH 21/33] chore: make static_check happy --- apps/emqx_conf/src/emqx_conf_app.erl | 2 +- .../src/proto/emqx_conf_proto_v3.erl | 114 ++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index f7ce797b8..fd0a56853 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -236,5 +236,5 @@ has_deprecated_file(#{node := Node} = Info) -> %% The old version don't have emqx_config:has_deprecated_file/0 DataDir = emqx_conf_proto_v2:get_config(Node, [node, data_dir]), File = filename:join([DataDir, "configs", "cluster-override.conf"]), - rpc:call(Node, filelib, is_regular, [File], 5000) + emqx_conf_proto_v3:file_exist(Node, File) end. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl new file mode 100644 index 000000000..802436f98 --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -0,0 +1,114 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + sync_data_from_node/1, + get_config/2, + get_config/3, + get_all/1, + + update/3, + update/4, + remove_config/2, + remove_config/3, + + reset/2, + reset/3, + + get_override_config_file/1, + file_exist/2 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.24". + +-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +sync_data_from_node(Node) -> + rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). +-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. + +-spec get_config(node(), emqx_utils_maps:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update( + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update( + node(), + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). + +-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). +get_override_config_file(Nodes) -> + rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). + +-spec file_exist(node(), string()) -> emqx_rpc:badrpc() | boolean(). +file_exist(Node, File) -> + rpc:call(Node, filelib, is_regular, [File], 5000). From 8bfee903223780dc5492d289880b4b5aa8f76713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Sun, 23 Apr 2023 17:20:54 +0800 Subject: [PATCH 22/33] chore: make static_check happy --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index db4765e3f..11bd4aa77 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -11,6 +11,7 @@ {emqx_cm,1}. {emqx_conf,1}. {emqx_conf,2}. +{emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_exhook,1}. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl index 97446ee9f..3bcf532f6 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, sync_data_from_node/1, get_config/2, get_config/3, @@ -41,6 +42,9 @@ introduced_in() -> "5.0.1". +deprecated_since() -> + "5.0.23". + -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). sync_data_from_node(Node) -> rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). From 275967a49f54dc68d2a63f8023d31dd70ca11239 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 24 Apr 2023 16:13:42 +0800 Subject: [PATCH 23/33] chore: remove dashboard's default username from emqx.conf --- apps/emqx_dashboard/etc/emqx_dashboard.conf | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index 856779500..67e3f61ec 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -2,6 +2,4 @@ dashboard { listeners.http { bind = 18083 } - default_username = "admin" - default_password = "public" } From 7ce04358c411ba85e9fdb6f5154d56daaf89a8ab Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 24 Apr 2023 17:28:33 +0800 Subject: [PATCH 24/33] fix(Dynamo): fix DynamoDB bridge status check error --- .../emqx_ee_connector/src/emqx_ee_connector_dynamo.erl | 2 +- .../src/emqx_ee_connector_dynamo_client.erl | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 2170827d6..01554f90a 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -34,7 +34,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -define(DYNAMO_HOST_OPTIONS, #{ - default_port => 8000 + default_port => 80 }). %%===================================================================== diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl index e0d8ee4bf..0340655b4 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl @@ -71,8 +71,14 @@ init(#{ {ok, #{}}. handle_call(is_connected, _From, State) -> - _ = erlcloud_ddb2:list_tables(), - {reply, true, State}; + IsConnected = + case erlcloud_ddb2:list_tables([{limit, 1}]) of + {ok, _} -> + true; + _ -> + false + end, + {reply, IsConnected, State}; handle_call({query, Table, Query, Templates}, _From, State) -> Result = do_query(Table, Query, Templates), {reply, Result, State}; From 3f689d0fdf287c38b5f81c9d01f6fe65057f3b7f Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 24 Apr 2023 15:27:42 +0800 Subject: [PATCH 25/33] feat: don't do rpc call to check deprecated file --- apps/emqx/priv/bpapi.versions | 1 - apps/emqx_conf/src/emqx_conf_app.erl | 12 +- .../src/proto/emqx_conf_proto_v2.erl | 4 - .../src/proto/emqx_conf_proto_v3.erl | 114 ------------------ 4 files changed, 7 insertions(+), 124 deletions(-) delete mode 100644 apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 11bd4aa77..db4765e3f 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -11,7 +11,6 @@ {emqx_cm,1}. {emqx_conf,1}. {emqx_conf,2}. -{emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. {emqx_exhook,1}. diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index fd0a56853..fbfb97a79 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -66,7 +66,8 @@ get_override_config_file() -> conf => Conf, tnx_id => TnxId, node => Node, - has_deprecated_file => HasDeprecateFile + has_deprecated_file => HasDeprecateFile, + release => emqx_app:get_release() } end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of @@ -180,6 +181,8 @@ copy_override_conf_from_core_node() -> msg => "copy_cluster_conf_from_core_node_success", node => Node, has_deprecated_file => HasDeprecatedFile, + local_release => emqx_app:get_release(), + remote_release => maps:get(release, Info, "before_v5.0.24|e5.0.3"), data_dir => emqx:data_dir(), tnx_id => TnxId }), @@ -228,13 +231,12 @@ sync_data_from_node(Node) -> error(Error) end. -has_deprecated_file(#{node := Node} = Info) -> +has_deprecated_file(#{conf := Conf} = Info) -> case maps:find(has_deprecated_file, Info) of {ok, HasDeprecatedFile} -> HasDeprecatedFile; error -> %% The old version don't have emqx_config:has_deprecated_file/0 - DataDir = emqx_conf_proto_v2:get_config(Node, [node, data_dir]), - File = filename:join([DataDir, "configs", "cluster-override.conf"]), - emqx_conf_proto_v3:file_exist(Node, File) + %% Conf is not empty if deprecated file is found. + Conf =/= #{} end. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl index 3bcf532f6..97446ee9f 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v2.erl @@ -20,7 +20,6 @@ -export([ introduced_in/0, - deprecated_since/0, sync_data_from_node/1, get_config/2, get_config/3, @@ -42,9 +41,6 @@ introduced_in() -> "5.0.1". -deprecated_since() -> - "5.0.23". - -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). sync_data_from_node(Node) -> rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl deleted file mode 100644 index 802436f98..000000000 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ /dev/null @@ -1,114 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_conf_proto_v3). - --behaviour(emqx_bpapi). - --export([ - introduced_in/0, - sync_data_from_node/1, - get_config/2, - get_config/3, - get_all/1, - - update/3, - update/4, - remove_config/2, - remove_config/3, - - reset/2, - reset/3, - - get_override_config_file/1, - file_exist/2 -]). - --include_lib("emqx/include/bpapi.hrl"). - -introduced_in() -> - "5.0.24". - --spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). -sync_data_from_node(Node) -> - rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). --type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. - --spec get_config(node(), emqx_utils_maps:config_key_path()) -> - term() | emqx_rpc:badrpc(). -get_config(Node, KeyPath) -> - rpc:call(Node, emqx, get_config, [KeyPath]). - --spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> - term() | emqx_rpc:badrpc(). -get_config(Node, KeyPath, Default) -> - rpc:call(Node, emqx, get_config, [KeyPath, Default]). - --spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). -get_all(KeyPath) -> - rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). - --spec update( - update_config_key_path(), - emqx_config:update_request(), - emqx_config:update_opts() -) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -update(KeyPath, UpdateReq, Opts) -> - emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). - --spec update( - node(), - update_config_key_path(), - emqx_config:update_request(), - emqx_config:update_opts() -) -> - {ok, emqx_config:update_result()} - | {error, emqx_config:update_error()} - | emqx_rpc:badrpc(). -update(Node, KeyPath, UpdateReq, Opts) -> - rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). - --spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> - {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -remove_config(KeyPath, Opts) -> - emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). - --spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> - {ok, emqx_config:update_result()} - | {error, emqx_config:update_error()} - | emqx_rpc:badrpc(). -remove_config(Node, KeyPath, Opts) -> - rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). - --spec reset(update_config_key_path(), emqx_config:update_opts()) -> - {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset(KeyPath, Opts) -> - emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). - --spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> - {ok, emqx_config:update_result()} - | {error, emqx_config:update_error()} - | emqx_rpc:badrpc(). -reset(Node, KeyPath, Opts) -> - rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). - --spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). -get_override_config_file(Nodes) -> - rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). - --spec file_exist(node(), string()) -> emqx_rpc:badrpc() | boolean(). -file_exist(Node, File) -> - rpc:call(Node, filelib, is_regular, [File], 5000). From 33c27ac2acf9ff2c6c57b1beef911a360621914b Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 25 Apr 2023 10:50:44 +0800 Subject: [PATCH 26/33] fix(dynamo): use correct default port for different schemas --- .../src/emqx_ee_connector_dynamo.erl | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 01554f90a..3cf7322dc 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -33,10 +33,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --define(DYNAMO_HOST_OPTIONS, #{ - default_port => 80 -}). - %%===================================================================== %% Hocon schema roots() -> @@ -84,8 +80,8 @@ on_start( config => redact(Config) }), - {Schema, Server} = get_host_schema(to_str(Url)), - {Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + {Schema, Server, DefaultPort} = get_host_info(to_str(Url)), + {Host, Port} = emqx_schema:parse_server(Server, #{default_port => DefaultPort}), Options = [ {config, #{ @@ -226,12 +222,12 @@ to_str(List) when is_list(List) -> to_str(Bin) when is_binary(Bin) -> erlang:binary_to_list(Bin). -get_host_schema("http://" ++ Server) -> - {"http://", Server}; -get_host_schema("https://" ++ Server) -> - {"https://", Server}; -get_host_schema(Server) -> - {"http://", Server}. +get_host_info("http://" ++ Server) -> + {"http://", Server, 80}; +get_host_info("https://" ++ Server) -> + {"https://", Server, 443}; +get_host_info(Server) -> + {"http://", Server, 80}. redact(Data) -> emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). From f84fc6f8b922cc000a45694b0429609e182b77cf Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 25 Apr 2023 11:59:06 +0800 Subject: [PATCH 27/33] fix: can't update authentication when cluster-override.conf --- apps/emqx/src/emqx_config.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index c94f25ead..9561263ca 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -333,6 +333,7 @@ init_load(SchemaMod, Conf, Opts) when is_list(Conf) orelse is_binary(Conf) -> init_load(HasDeprecatedFile, SchemaMod, RawConf, Opts). init_load(true, SchemaMod, RawConf, Opts) when is_map(RawConf) -> + ok = save_schema_mod_and_names(SchemaMod), %% deprecated conf will be removed in 5.1 %% Merge environment variable overrides on top RawConfWithEnvs = merge_envs(SchemaMod, RawConf), From d6208d8847906341046fef7f7f49cb4769a271f5 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Tue, 25 Apr 2023 14:47:05 +0800 Subject: [PATCH 28/33] test: add test for depreated config file --- apps/emqx/test/emqx_config_SUITE.erl | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 7befd7a16..1704d1476 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -59,3 +59,22 @@ t_fill_default_values(_) -> %% ensure JSON compatible _ = emqx_utils_json:encode(WithDefaults), ok. + +t_init_load(_Config) -> + ConfFile = "./test_emqx.conf", + ok = file:write_file(ConfFile, <<"">>), + ExpectRootNames = lists:sort(hocon_schema:root_names(emqx_schema)), + emqx_config:erase_schema_mod_and_names(), + {ok, DeprecatedFile} = application:get_env(emqx, cluster_override_conf_file), + ?assertEqual(false, filelib:is_regular(DeprecatedFile), DeprecatedFile), + %% Don't has deprecated file + ok = emqx_config:init_load(emqx_schema, [ConfFile]), + ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), + ?assertMatch({ok, #{raw_config := 256}}, emqx:update_config([mqtt, max_topic_levels], 256)), + emqx_config:erase_schema_mod_and_names(), + %% Has deprecated file + ok = file:write_file(DeprecatedFile, <<"{}">>), + ok = emqx_config:init_load(emqx_schema, [ConfFile]), + ?assertEqual(ExpectRootNames, lists:sort(emqx_config:get_root_names())), + ?assertMatch({ok, #{raw_config := 128}}, emqx:update_config([mqtt, max_topic_levels], 128)), + ok = file:delete(DeprecatedFile). From 3bb50a5751372be09ace7619684f55441cd2bfb0 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 25 Apr 2023 16:15:28 +0800 Subject: [PATCH 29/33] fix(rocketmq): fix that the update of ACL info not working --- .../src/emqx_ee_connector_rocketmq.erl | 87 +++++++++---------- rel/i18n/emqx_ee_connector_rocketmq.hocon | 22 +++++ 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 29f8ef84d..70a27ef6e 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -44,6 +44,17 @@ fields(config) -> binary(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, + {access_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("access_key")} + )}, + {secret_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("secret_key")} + )}, + {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}, {sync_timeout, mk( emqx_schema:duration(), @@ -59,39 +70,15 @@ fields(config) -> emqx_schema:bytesize(), #{default => <<"1024KB">>, desc => ?DESC(send_buffer)} )}, - {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})} - | relational_fields() + + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. -add_default_username(Fields) -> - lists:map( - fun - ({username, OrigUsernameFn}) -> - {username, add_default_fn(OrigUsernameFn, <<"">>)}; - (Field) -> - Field - end, - Fields - ). - -add_default_fn(OrigFn, Default) -> - fun - (default) -> Default; - (Field) -> OrigFn(Field) - end. - servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). -relational_fields() -> - Fields = [username, password, auto_reconnect], - Values = lists:filter( - fun({E, _}) -> lists:member(E, Fields) end, - emqx_connector_schema_lib:relational_db_fields() - ), - add_default_username(Values). - %%======================================================================================== %% `emqx_resource' API %%======================================================================================== @@ -102,21 +89,20 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1 + #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", connector => InstanceId, - config => redact(Config1) + config => redact(Config) }), - Config = maps:merge(default_security_info(), Config1), Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), ClientId = client_id(InstanceId), - ClientCfg = #{acl_info => #{}}, TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Config), + #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), + ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), ProducersMapPID = create_producers_map(ClientId), State = #{ @@ -140,11 +126,21 @@ on_start( Error end. -on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> +on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", connector => InstanceId }), + + Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}), + lists:foreach( + fun([Topic, Producer]) -> + ets:delete(ClientId, {RawTopic, Topic}), + _ = rocketmq:stop_and_delete_supervised_producers(Producer) + end, + Producers + ), + Pid ! ok, ok = rocketmq:stop_and_delete_supervised_client(ClientId). @@ -276,6 +272,8 @@ client_id(InstanceId) -> redact(Msg) -> emqx_utils:redact(Msg, fun is_sensitive_key/1). +is_sensitive_key(secret_key) -> + true; is_sensitive_key(security_token) -> true; is_sensitive_key(_) -> @@ -283,14 +281,14 @@ is_sensitive_key(_) -> make_producer_opts( #{ - username := Username, - password := Password, + access_key := AccessKey, + secret_key := SecretKey, security_token := SecurityToken, send_buffer := SendBuff, refresh_interval := RefreshInterval } ) -> - ACLInfo = acl_info(Username, Password, SecurityToken), + ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, @@ -299,17 +297,17 @@ make_producer_opts( acl_info(<<>>, <<>>, <<>>) -> #{}; -acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> +acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) -> #{ - access_key => Username, - secret_key => Password + access_key => AccessKey, + secret_key => SecretKey }; -acl_info(Username, Password, SecurityToken) when - is_binary(Username), is_binary(Password), is_binary(SecurityToken) +acl_info(AccessKey, SecretKey, SecurityToken) when + is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken) -> #{ - access_key => Username, - secret_key => Password, + access_key => AccessKey, + secret_key => SecretKey, security_token => SecurityToken }; acl_info(_, _, _) -> @@ -342,6 +340,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> ets:insert(ClientId, {TopicKey, Producers0}), Producers0 end. - -default_security_info() -> - #{username => <<>>, password => <<>>, security_token => <<>>}. diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 7f786898e..ddbe3a77b 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -26,6 +26,28 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" } } + access_key { + desc { + en: """RocketMQ server `accessKey`.""" + zh: """RocketMQ 服务器的 `accessKey`。""" + } + label: { + en: "AccessKey" + zh: "AccessKey" + } + } + + secret_key { + desc { + en: """RocketMQ server `secretKey`.""" + zh: """RocketMQ 服务器的 `secretKey`。""" + } + label: { + en: "SecretKey" + zh: "SecretKey" + } + } + sync_timeout { desc { en: """Timeout of RocketMQ driver synchronous call.""" From 3138e2b3a1cbd772dd1928a111b964fbb1dead23 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 18 Apr 2023 10:19:09 -0300 Subject: [PATCH 30/33] chore: un-hide ocsp stapling config Undoing https://github.com/emqx/emqx/pull/10160 --- apps/emqx/src/emqx_schema.erl | 3 +-- apps/emqx/test/emqx_ocsp_cache_SUITE.erl | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index b3c5e1778..ba333f111 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2323,8 +2323,6 @@ server_ssl_opts_schema(Defaults, IsRanchListener) -> ref("ocsp"), #{ required => false, - %% TODO: remove after e5.0.2 - importance => ?IMPORTANCE_HIDDEN, validator => fun ocsp_inner_validator/1 } )}, @@ -2333,6 +2331,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) -> boolean(), #{ default => false, + importance => ?IMPORTANCE_MEDIUM, desc => ?DESC("server_ssl_opts_schema_enable_crl_check") } )} diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index dff8ce5a7..15ca29853 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -677,8 +677,12 @@ do_t_update_listener(Config) -> %% no ocsp at first ListenerId = "ssl:default", {ok, {{_, 200, _}, _, ListenerData0}} = get_listener_via_api(ListenerId), - ?assertEqual( - undefined, + ?assertMatch( + #{ + <<"enable_ocsp_stapling">> := false, + <<"refresh_http_timeout">> := _, + <<"refresh_interval">> := _ + }, emqx_utils_maps:deep_get([<<"ssl_options">>, <<"ocsp">>], ListenerData0, undefined) ), assert_no_http_get(), From a703707803d1471fb5672a1e022a6e663f49c68e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 25 Apr 2023 10:51:34 -0300 Subject: [PATCH 31/33] chore: tag e5.0.3-alpha.3 --- apps/emqx/include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index d1f1a93ae..6d91b2528 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.22"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.3-alpha.2"). +-define(EMQX_RELEASE_EE, "5.0.3-alpha.3"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). From bc1bdae55d137cbd47a3b67570e47d7e72345320 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 26 Apr 2023 11:27:31 +0800 Subject: [PATCH 32/33] chore: reslove confilt for sync release-50 to master --- .../src/emqx_ee_connector_dynamo.erl | 12 ++++++++---- .../src/emqx_ee_connector_rocketmq.erl | 6 ++++-- .../src/emqx_ee_connector_sqlserver.erl | 5 ++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index b6270b1b6..5eee882ce 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -80,8 +80,10 @@ on_start( config => redact(Config) }), - {Schema, Server} = get_host_schema(to_str(Url)), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + {Schema, Server, DefaultPort} = get_host_info(to_str(Url)), + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ + default_port => DefaultPort + }), Options = [ {config, #{ @@ -142,8 +144,10 @@ on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, Stat on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. -on_get_status(_InstanceId, #{pool_name := PoolName}) -> - Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), +on_get_status(_InstanceId, #{pool_name := Pool}) -> + Health = emqx_resource_pool:health_check_workers( + Pool, {emqx_ee_connector_dynamo_client, is_connected, []} + ), status_result(Health). status_result(_Status = true) -> connected; diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index e831b4f2f..2e1730b52 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -96,8 +96,10 @@ on_start( connector => InstanceId, config => redact(Config) }), - Config = maps:merge(default_security_info(), Config1), - #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + Servers = lists:map( + fun(#{hostname := Host, port := Port}) -> {Host, Port} end, + emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) + ), ClientId = client_id(InstanceId), TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 180d1271c..90d90cb36 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -304,11 +304,10 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ), do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) -> - RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), +on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, - {?MODULE, do_get_status, []}, + {?MODULE, do_get_status, []} ), status_result(Health). From f0cd5c98c731e76e77816d45267ec76f5d33dac1 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 26 Apr 2023 13:57:15 +0800 Subject: [PATCH 33/33] chore: split i18n with script --- rel/i18n/emqx_ee_bridge_rocketmq.hocon | 12 +--- rel/i18n/emqx_ee_connector_rocketmq.hocon | 71 ++++++++++---------- rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon | 5 +- rel/i18n/zh/emqx_ee_connector_rocketmq.hocon | 22 +++++- 4 files changed, 63 insertions(+), 47 deletions(-) diff --git a/rel/i18n/emqx_ee_bridge_rocketmq.hocon b/rel/i18n/emqx_ee_bridge_rocketmq.hocon index 4e4f8a99c..e079220b6 100644 --- a/rel/i18n/emqx_ee_bridge_rocketmq.hocon +++ b/rel/i18n/emqx_ee_bridge_rocketmq.hocon @@ -34,17 +34,11 @@ local_topic.label: template.desc: """Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ.
-The template can be any valid string with placeholders, example:
-- ${id}, ${username}, ${clientid}, ${timestamp}
-- {\"id\" : ${id}, \"username\" : ${username}}""" + The template can be any valid string with placeholders, example:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {"id" : ${id}, "username" : ${username}}""" template.label: """Template""" -config_enable.desc: -"""Enable or disable this bridge""" - -config_enable.label: -"""Enable Or Disable Bridge""" - } diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 661cbe249..d3d59a389 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -1,52 +1,53 @@ emqx_ee_connector_rocketmq { +access_key.desc: +"""RocketMQ server `accessKey`.""" + +access_key.label: +"""AccessKey""" + +refresh_interval.desc: +"""RocketMQ Topic Route Refresh Interval.""" + +refresh_interval.label: +"""Topic Route Refresh Interval""" + +secret_key.desc: +"""RocketMQ server `secretKey`.""" + +secret_key.label: +"""SecretKey""" + +security_token.desc: +"""RocketMQ Server Security Token""" + +security_token.label: +"""Security Token""" + +send_buffer.desc: +"""The socket send buffer size of the RocketMQ driver client.""" + +send_buffer.label: +"""Send Buffer Size""" + servers.desc: """The IPv4 or IPv6 address or the hostname to connect to.
A host entry has the following form: `Host[:Port]`.
The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" -servers.label: +servers.label: """Server Host""" -topic.desc: -"""RocketMQ Topic""" - -topic.label: -"""RocketMQ Topic""" - -access_key.desc: -"""RocketMQ server `accessKey`.""" - -access_key.label: -"""AccessKey""" - -secret_key.desc: -"""RocketMQ server `secretKey`.""" -secret_key.label: -"""SecretKey""" - sync_timeout.desc: """Timeout of RocketMQ driver synchronous call.""" -sync_timeout.label: +sync_timeout.label: """Sync Timeout""" - -refresh_interval.desc: -"""RocketMQ Topic Route Refresh Interval.""" - -refresh_interval.label: -"""Topic Route Refresh Interval""" -send_buffer.desc: -"""The socket send buffer size of the RocketMQ driver client.""" +topic.desc: +"""RocketMQ Topic""" -send_buffer.label: -"""Send Buffer Size""" - -security_token.desc: -"""RocketMQ Server Security Token""" - -security_token.label: -"""Security Token""" +topic.label: +"""RocketMQ Topic""" } diff --git a/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon b/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon index 924004361..445a54232 100644 --- a/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon +++ b/rel/i18n/zh/emqx_ee_bridge_rocketmq.hocon @@ -32,7 +32,10 @@ local_topic.label: """本地 Topic""" template.desc: -"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ""" +"""模板, 默认为空,为空时将会将整个消息转发给 RocketMQ。
+ 模板可以是任意带有占位符的合法字符串, 例如:
+ - ${id}, ${username}, ${clientid}, ${timestamp}
+ - {"id" : ${id}, "username" : ${username}}""" template.label: """模板""" diff --git a/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon b/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon index d32e6ea01..58a1f7ddb 100644 --- a/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/zh/emqx_ee_connector_rocketmq.hocon @@ -1,11 +1,23 @@ emqx_ee_connector_rocketmq { +access_key.desc: +"""RocketMQ 服务器的 `accessKey`。""" + +access_key.label: +"""AccessKey""" + refresh_interval.desc: """RocketMQ 主题路由更新间隔。""" refresh_interval.label: """主题路由更新间隔""" +secret_key.desc: +"""RocketMQ 服务器的 `secretKey`。""" + +secret_key.label: +"""SecretKey""" + security_token.desc: """RocketMQ 服务器安全令牌""" @@ -18,14 +30,20 @@ send_buffer.desc: send_buffer.label: """发送消息的缓冲区大小""" -server.desc: +servers.desc: """将要连接的 IPv4 或 IPv6 地址,或者主机名。
主机名具有以下形式:`Host[:Port]`。
如果未指定 `[:Port]`,则使用 RocketMQ 默认端口 9876。""" -server.label: +servers.label: """服务器地址""" +sync_timeout.desc: +"""RocketMQ 驱动同步调用的超时时间。""" + +sync_timeout.label: +"""同步调用超时时间""" + topic.desc: """RocketMQ 主题"""