From 9c48b016a93fbb3c1604703a41f0490d21957920 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 18 May 2023 19:33:44 +0300 Subject: [PATCH 01/37] fix(evacuation): handle expire interval correctly --- apps/emqx/include/emqx_channel.hrl | 2 ++ apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx/src/emqx_cm.erl | 1 + .../emqx/src/persistent_session/emqx_persistent_session.erl | 6 ++---- apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src | 2 +- .../emqx_eviction_agent/src/emqx_eviction_agent_channel.erl | 4 ++-- apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl | 6 +++--- .../test/emqx_eviction_agent_channel_SUITE.erl | 3 ++- 8 files changed, 14 insertions(+), 12 deletions(-) diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl index d4362633a..be2448a20 100644 --- a/apps/emqx/include/emqx_channel.hrl +++ b/apps/emqx/include/emqx_channel.hrl @@ -40,3 +40,5 @@ session, will_msg ]). + +-define(EXPIRE_INTERVAL_INFINITE, 4294967295000). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 69e0a55f7..d5a9051f2 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2064,7 +2064,7 @@ maybe_resume_session(#channel{ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> case maps:get(expiry_interval, ConnInfo) of - ?UINT_MAX -> + ?EXPIRE_INTERVAL_INFINITE -> {ok, Channel}; I when I > 0 -> {ok, ensure_timer(expire_timer, I, Channel)}; diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 66c1db36e..f01b29a8c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -769,6 +769,7 @@ mark_channel_connected(ChanPid) -> mark_channel_disconnected(ChanPid) -> ?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}), ets:delete(?CHAN_LIVE_TAB, ChanPid), + ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}), ok. get_connected_client_count() -> diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index 68f783283..3e9e00c81 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -60,14 +60,12 @@ -export_type([sess_msg_key/0]). -include("emqx.hrl"). +-include("emqx_channel.hrl"). -include("emqx_persistent_session.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile({inline, [is_store_enabled/0]}). -%% 16#FFFFFFFF * 1000 --define(MAX_EXPIRY_INTERVAL, 4294967295000). - %% NOTE: Order is significant because of traversal order of the table. -define(MARKER, 3). -define(DELIVERED, 2). @@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) -> %% @private [MQTT-3.1.2-23] persistent_session_status(#session_store{expiry_interval = 0}) -> not_persistent; -persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) -> +persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) -> persistent; persistent_session_status(#session_store{expiry_interval = E, ts = TS}) -> case E + TS > erlang:system_time(millisecond) of diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index 239d9052e..7e6cf5b95 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.0.0"}, + {vsn, "5.0.1"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index a6097f03d..1369ee969 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -218,10 +218,10 @@ cancel_expiry_timer(_) -> set_expiry_timer(#{conninfo := ConnInfo} = Channel) -> case maps:get(expiry_interval, ConnInfo) of - ?UINT_MAX -> + ?EXPIRE_INTERVAL_INFINITE -> {ok, Channel}; I when I > 0 -> - Timer = erlang:send_after(timer:seconds(I), self(), expire_session), + Timer = erlang:send_after(I, self(), expire_session), {ok, Channel#{expiry_timer => Timer}}; _ -> {error, should_be_expired} diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 22b694d77..0f1b948c7 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) -> ?assert(false, "Connection not evicted") end end, - #{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid}, 2000 ), @@ -383,7 +383,7 @@ t_ws_conn(_Config) -> ?assertWaitEvent( ok = emqx_eviction_agent:evict_connections(1), - #{?snk_kind := emqx_cm_connected_client_count_dec}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done}, 1000 ), @@ -418,7 +418,7 @@ t_quic_conn(_Config) -> ?assertWaitEvent( ok = emqx_eviction_agent:evict_connections(1), - #{?snk_kind := emqx_cm_connected_client_count_dec}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done}, 1000 ), diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 3b7ef6672..4ace80893 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_channel.hrl"). -define(CLIENT_ID, <<"client_with_session">>). @@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) -> conninfo => #{ clientid => ?CLIENT_ID, receive_maximum => 32, - expiry_interval => ?UINT_MAX + expiry_interval => ?EXPIRE_INTERVAL_INFINITE } }, ?assertMatch( From 65f973044f928dc78a929aaa847c2beecff0fd7a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 19 May 2023 16:09:15 -0300 Subject: [PATCH 02/37] feat(pulsar): improve authn error check time and add connect timeout Fixes https://emqx.atlassian.net/browse/EMQX-9910 --- apps/emqx_bridge_pulsar/rebar.config | 2 +- .../src/emqx_bridge_pulsar.erl | 8 +++++ .../src/emqx_bridge_pulsar_impl_producer.erl | 29 +++++++++++++++++-- rel/i18n/emqx_bridge_pulsar.hocon | 5 ++++ rel/i18n/zh/emqx_bridge_pulsar.hocon | 5 ++++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config index d5a63f320..c77007b93 100644 --- a/apps/emqx_bridge_pulsar/rebar.config +++ b/apps/emqx_bridge_pulsar/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}}, + {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 7d1b20d24..721937cd2 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -57,6 +57,14 @@ fields(config) -> sensitive => true, desc => ?DESC("authentication") } + )}, + {connect_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC("connect_timeout") + } )} ] ++ emqx_connector_schema_lib:ssl_fields(); fields(producer_opts) -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 59956e1b6..5ed706511 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -48,6 +48,7 @@ memory_overload_protection := boolean() }, compression := compression_mode(), + connect_timeout := emqx_schema:duration_ms(), max_batch_bytes := emqx_schema:bytesize(), message := message_template_raw(), pulsar_topic := binary(), @@ -81,7 +82,9 @@ on_start(InstanceId, Config) -> Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), SSLOpts = emqx_tls_lib:to_client_opts(SSL), + ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ + connect_timeout => ConnectTimeout, ssl_opts => SSLOpts, conn_opts => conn_opts(Config) }, @@ -96,13 +99,19 @@ on_start(InstanceId, Config) -> } ); {error, Reason} -> + RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1), ?SLOG(error, #{ msg => "failed_to_start_pulsar_client", instance_id => InstanceId, pulsar_hosts => Servers, - reason => emqx_utils:redact(Reason, fun is_sensitive_key/1) + reason => RedactedReason }), - throw(failed_to_start_pulsar_client) + Message = + case get_error_message(RedactedReason) of + {ok, Msg} -> Msg; + error -> failed_to_start_pulsar_client + end, + throw(Message) end, start_producer(Config, InstanceId, ClientId, ClientOpts). @@ -422,3 +431,19 @@ partition_strategy(Strategy) -> Strategy. is_sensitive_key(auth_data) -> true; is_sensitive_key(_) -> false. + +get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) -> + Iter = maps:iterator(BrokerErrorMap), + do_get_error_message(Iter); +get_error_message(_Error) -> + error. + +do_get_error_message(Iter) -> + case maps:next(Iter) of + {{_Broker, _Port}, #{message := Message}, _NIter} -> + {ok, Message}; + {_K, _V, NIter} -> + do_get_error_message(NIter); + none -> + error + end. diff --git a/rel/i18n/emqx_bridge_pulsar.hocon b/rel/i18n/emqx_bridge_pulsar.hocon index 92294bb75..d1f5c8b13 100644 --- a/rel/i18n/emqx_bridge_pulsar.hocon +++ b/rel/i18n/emqx_bridge_pulsar.hocon @@ -67,6 +67,11 @@ emqx_bridge_pulsar { label = "Enable or Disable" } + connect_timeout { + desc = "Maximum wait time for TCP connection establishment (including authentication time if enabled)." + label = "Connect Timeout" + } + desc_name { desc = "Bridge name, used as a human-readable description of the bridge." label = "Bridge Name" diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon index 23643060b..3e0ba95c5 100644 --- a/rel/i18n/zh/emqx_bridge_pulsar.hocon +++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon @@ -20,6 +20,11 @@ emqx_bridge_pulsar { label = "启用或停用" } + connect_timeout { + desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。" + label = "连接超时" + } + servers { desc = "以逗号分隔的 scheme://host[:port] 格式的 Pulsar URL 列表," "支持的 scheme 有 pulsar:// (默认)" From 0877e4296aba320d4f0381f6986925916cebc63c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 22 May 2023 11:35:04 -0300 Subject: [PATCH 03/37] fix(dashboard): add missing function clause Example error: https://github.com/emqx/emqx/actions/runs/5045715277/jobs/9052482682#step:8:294 The previous change was not equivalent to the previous `maps:with/2` behavior. --- apps/emqx_dashboard/src/emqx_dashboard.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index aec811e5d..364853eec 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -192,7 +192,9 @@ ranch_opts(Options) -> RanchOpts#{socket_opts => InetOpts ++ SocketOpts}. proto_opts(#{proxy_header := ProxyHeader}) -> - #{proxy_header => ProxyHeader}. + #{proxy_header => ProxyHeader}; +proto_opts(_Opts) -> + #{}. filter_false(_K, false, S) -> S; filter_false(K, V, S) -> [{K, V} | S]. From 67a30ee9761e83e763b6bcf2e031a1e093c8077e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Tue, 23 May 2023 10:16:43 +0800 Subject: [PATCH 04/37] test: bridge test init dashboard listener failed --- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 27c1c779a..49ddd19bd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -161,7 +161,7 @@ init_node(Type) -> primary -> ok = emqx_config:put( [dashboard, listeners], - #{http => #{enable => true, bind => 18083}} + #{http => #{enable => true, bind => 18083}, proxy_header => false} ), ok = emqx_dashboard:start_listeners(), ready = emqx_dashboard_listener:regenerate_minirest_dispatch(), From d19ddb1832fbc7c15e900e9f047ed6097e369518 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 22 May 2023 13:32:49 +0200 Subject: [PATCH 05/37] fix: IoTDB bridge incoming payload needs to be parsed as JSON There was an incorrect assumption that the data incoming to the IoTDB bridge has already been parsed. This is fixed by parsing the payload as JSON data if the payload is not already a map. Fixes: https://emqx.atlassian.net/browse/EMQX-9854 --- .../src/emqx_bridge_iotdb.app.src | 2 +- .../src/emqx_bridge_iotdb_impl.erl | 61 +++++++++++++------ 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 9c5108307..cebf60cb1 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 2f8794560..416e19a3a 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %% Internal Functions %%-------------------------------------------------------------------- -preproc_data(DataList) -> +make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> + emqx_utils_json:decode(PayloadUnparsed, [return_maps]); +make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> + lists:map(fun make_parsed_payload/1, PayloadUnparsed); +make_parsed_payload( + #{ + measurement := Measurement, + data_type := DataType, + value := Value + } = Data +) -> + Data#{ + <<"measurement">> => Measurement, + <<"data_type">> => DataType, + <<"value">> => Value + }. + +preproc_data( + #{ + <<"measurement">> := Measurement, + <<"data_type">> := DataType, + <<"value">> := Value + } = Data +) -> + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + }. + +preproc_data_list(DataList) -> lists:map( - fun( - #{ - measurement := Measurement, - data_type := DataType, - value := Value - } = Data - ) -> - #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( - maps:get(<<"timestamp">>, Data, <<"now">>) - ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) - } - end, + fun preproc_data/1, DataList ). @@ -258,12 +276,15 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(Message, State) -> +make_iotdb_insert_request(MessageUnparsedPayload, State) -> + PayloadUnparsed = maps:get(payload, MessageUnparsedPayload), + PayloadParsed = make_parsed_payload(PayloadUnparsed), + Message = MessageUnparsedPayload#{payload => PayloadParsed}, IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), Payload = make_list(maps:get(payload, Message)), - PreProcessedData = preproc_data(Payload), + PreProcessedData = preproc_data_list(Payload), DataList = proc_data(PreProcessedData, Message), InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), @@ -350,6 +371,8 @@ device_id(Message, State) -> case maps:get(device_id, State, undefined) of undefined -> case maps:get(payload, Message) of + #{<<"device_id">> := DeviceId} -> + DeviceId; #{device_id := DeviceId} -> DeviceId; _NotFound -> From 89ea1646bedacd18e4fe5991874a5db435ab3d7d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 May 2023 10:57:05 +0200 Subject: [PATCH 06/37] fix: IoTDB name for version option The previous name for the version option was v1.0.x which is clearly wrong since this option was tested against IoTDB version v1.1.0. This commit fixes this by renaming the option to v1.x. Fixes: https://emqx.atlassian.net/browse/EMQX-9926 --- apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl | 2 +- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl | 6 +++--- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl index 5e6bf9ac5..5d693547a 100644 --- a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl +++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl @@ -5,7 +5,7 @@ -ifndef(EMQX_BRIDGE_IOTDB_HRL). -define(EMQX_BRIDGE_IOTDB_HRL, true). --define(VSN_1_0_X, 'v1.0.x'). +-define(VSN_1_X, 'v1.x'). -define(VSN_0_13_X, 'v0.13.x'). -endif. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 90e8d18a4..aa2c32589 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -109,10 +109,10 @@ basic_config() -> )}, {iotdb_version, mk( - hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]), + hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]), #{ desc => ?DESC("config_iotdb_version"), - default => ?VSN_1_0_X + default => ?VSN_1_X } )} ] ++ resource_creation_opts() ++ @@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) -> is_aligned => false, device_id => <<"my_device">>, base_url => <<"http://iotdb.local:18080/">>, - iotdb_version => ?VSN_1_0_X, + iotdb_version => ?VSN_1_X, connect_timeout => <<"15s">>, pool_type => <<"random">>, pool_size => 8, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 416e19a3a..9014fde0b 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -282,7 +282,7 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) -> Message = MessageUnparsedPayload#{payload => PayloadParsed}, IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), - IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), + IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X), Payload = make_list(maps:get(payload, Message)), PreProcessedData = preproc_data_list(Payload), DataList = proc_data(PreProcessedData, Message), @@ -351,15 +351,15 @@ insert_value(1, Data, [Value | Values]) -> insert_value(Index, Data, [Value | Values]) -> [[null | Value] | insert_value(Index - 1, Data, Values)]. -iotdb_field_key(is_aligned, ?VSN_1_0_X) -> +iotdb_field_key(is_aligned, ?VSN_1_X) -> <<"is_aligned">>; iotdb_field_key(is_aligned, ?VSN_0_13_X) -> <<"isAligned">>; -iotdb_field_key(device_id, ?VSN_1_0_X) -> +iotdb_field_key(device_id, ?VSN_1_X) -> <<"device">>; iotdb_field_key(device_id, ?VSN_0_13_X) -> <<"deviceId">>; -iotdb_field_key(data_types, ?VSN_1_0_X) -> +iotdb_field_key(data_types, ?VSN_1_X) -> <<"data_types">>; iotdb_field_key(data_types, ?VSN_0_13_X) -> <<"dataTypes">>. From 29e0e41ec7e862a876cdcc08b24565322aa64ba0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 09:08:55 -0300 Subject: [PATCH 07/37] docs: improve descriptions --- rel/i18n/zh/emqx_bridge_pulsar.hocon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/i18n/zh/emqx_bridge_pulsar.hocon b/rel/i18n/zh/emqx_bridge_pulsar.hocon index 3e0ba95c5..4e2fd5c9f 100644 --- a/rel/i18n/zh/emqx_bridge_pulsar.hocon +++ b/rel/i18n/zh/emqx_bridge_pulsar.hocon @@ -22,7 +22,7 @@ emqx_bridge_pulsar { connect_timeout { desc = "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。" - label = "连接超时" + label = "连接超时时间" } servers { From 91150e6d831d93ab508078769dc9c789fe7ba2c6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 22 May 2023 23:07:47 +0200 Subject: [PATCH 08/37] chore: allow special chars in log dir the current valdiator does not allow space and ':' in log file path which is an unresonable restriction --- apps/emqx_conf/src/emqx_conf.app.src | 2 +- apps/emqx_conf/src/emqx_conf_schema.erl | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index e6c3d9cd9..c31a16b9b 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.19"}, + {vsn, "0.1.20"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 94cbfb221..76213c690 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -893,8 +893,7 @@ fields("log_file_handler") -> #{ desc => ?DESC("log_file_handler_file"), default => <<"${EMQX_LOG_DIR}/emqx.log">>, - converter => fun emqx_schema:naive_env_interpolation/1, - validator => fun validate_file_location/1 + converter => fun emqx_schema:naive_env_interpolation/1 } )}, {"rotation", @@ -1333,11 +1332,6 @@ emqx_schema_high_prio_roots() -> )}, lists:keyreplace("authorization", 1, Roots, Authz). -validate_file_location(File) -> - ValidFile = "^[/\\_a-zA-Z0-9\\.\\-]*$", - Error = "Invalid file name: " ++ ValidFile, - validator_string_re(File, ValidFile, Error). - validate_time_offset(Offset) -> ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$", Error = From 2fa5b511bf90b62ed5cd1ca21c6fb3ac0a4f05fa Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 23 May 2023 10:36:41 +0200 Subject: [PATCH 09/37] chore: hide stale config --- apps/emqx/src/emqx_schema.erl | 3 ++- rel/i18n/emqx_schema.hocon | 3 --- rel/i18n/zh/emqx_schema.hocon | 3 --- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index dfeae6d64..80894319b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1540,7 +1540,8 @@ fields("broker") -> boolean(), #{ default => true, - desc => ?DESC(broker_route_batch_clean) + desc => "This config is stale since 4.3", + importance => ?IMPORTANCE_HIDDEN } )}, {"perf", diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index af26d2fdb..5d6977b47 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -213,9 +213,6 @@ pending connections can grow to.""" fields_tcp_opts_backlog.label: """TCP backlog length""" -broker_route_batch_clean.desc: -"""Enable batch clean for deleted routes.""" - fields_mqtt_quic_listener_initial_window_packets.desc: """The size (in packets) of the initial congestion window for a connection. Default: 10""" diff --git a/rel/i18n/zh/emqx_schema.hocon b/rel/i18n/zh/emqx_schema.hocon index 835372868..0e329eac9 100644 --- a/rel/i18n/zh/emqx_schema.hocon +++ b/rel/i18n/zh/emqx_schema.hocon @@ -208,9 +208,6 @@ fields_tcp_opts_backlog.desc: fields_tcp_opts_backlog.label: """TCP 连接队列长度""" -broker_route_batch_clean.desc: -"""是否开启批量清除路由。""" - fields_mqtt_quic_listener_initial_window_packets.desc: """一个连接的初始拥堵窗口的大小(以包为单位)。默认值:10""" From 47a3096776cd301a6278c637a6733d1b08c36b28 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 23 May 2023 12:55:13 +0200 Subject: [PATCH 10/37] test: add test case for file path validation --- apps/emqx_conf/src/emqx_conf_schema.erl | 27 +++++++++++- .../emqx_conf/test/emqx_conf_schema_tests.erl | 44 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 76213c690..b37c3f71e 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -508,6 +508,7 @@ fields("node") -> desc => ?DESC(node_crash_dump_file), default => crash_dump_file_default(), importance => ?IMPORTANCE_HIDDEN, + converter => fun ensure_unicode_path/2, 'readOnly' => true } )}, @@ -754,6 +755,7 @@ fields("rpc") -> file(), #{ mapping => "gen_rpc.certfile", + converter => fun ensure_unicode_path/2, desc => ?DESC(rpc_certfile) } )}, @@ -762,6 +764,7 @@ fields("rpc") -> file(), #{ mapping => "gen_rpc.keyfile", + converter => fun ensure_unicode_path/2, desc => ?DESC(rpc_keyfile) } )}, @@ -770,6 +773,7 @@ fields("rpc") -> file(), #{ mapping => "gen_rpc.cacertfile", + converter => fun ensure_unicode_path/2, desc => ?DESC(rpc_cacertfile) } )}, @@ -892,8 +896,10 @@ fields("log_file_handler") -> file(), #{ desc => ?DESC("log_file_handler_file"), - default => <<"${EMQX_LOG_DIR}/emqx.log">>, - converter => fun emqx_schema:naive_env_interpolation/1 + converter => fun(Path, Opts) -> + emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts)) + end, + default => <<"${EMQX_LOG_DIR}/emqx.log">> } )}, {"rotation", @@ -1349,3 +1355,20 @@ validator_string_re(Val, RE, Error) -> node_array() -> hoconsc:union([emqx_schema:comma_separated_atoms(), hoconsc:array(atom())]). + +ensure_unicode_path(undefined, _) -> + undefined; +ensure_unicode_path(Path, #{make_serializable := true}) -> + %% format back to serializable string + unicode:characters_to_binary(Path, utf8); +ensure_unicode_path(Path, Opts) when is_binary(Path) -> + case unicode:characters_to_list(Path, utf8) of + {R, _, _} when R =:= error orelse R =:= incomplete -> + throw({"bad_file_path_string", Path}); + PathStr -> + ensure_unicode_path(PathStr, Opts) + end; +ensure_unicode_path(Path, _) when is_list(Path) -> + Path; +ensure_unicode_path(Path, _) -> + throw({"not_string", Path}). diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 06784e32d..4eaf3db6b 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -232,3 +232,47 @@ ensure_acl_conf() -> true -> ok; false -> file:write_file(File, <<"">>) end. + +log_path_test_() -> + Fh = fun(Path) -> + #{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}} + end, + Assert = fun(Name, Path, Conf) -> + ?assertMatch(#{log := #{file_handlers := #{Name := #{file := Path}}}}, Conf) + end, + + [ + {"default-values", fun() -> Assert(default, "log/emqx.log", check(#{})) end}, + {"file path with space", fun() -> Assert(name1, "a /b", check(Fh(<<"a /b">>))) end}, + {"windows", fun() -> Assert(name1, "c:\\a\\ b\\", check(Fh(<<"c:\\a\\ b\\">>))) end}, + {"unicoded", fun() -> Assert(name1, "路 径", check(Fh(<<"路 径"/utf8>>))) end}, + {"bad utf8", fun() -> + ?assertThrow( + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := {"bad_file_path_string", _} + } + ]}, + check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>)) + ) + end}, + {"not string", fun() -> + ?assertThrow( + {emqx_conf_schema, [ + #{ + kind := validation_error, + reason := {"not_string", _} + } + ]}, + check(Fh(#{<<"foo">> => <<"bar">>})) + ) + end} + ]. + +check(Config) -> + Schema = emqx_conf_schema, + {_, Conf} = hocon_tconf:map(Schema, Config, [log], #{ + atom_key => false, required => false, format => map + }), + emqx_utils_maps:unsafe_atom_key_map(Conf). From ea2fea77d795a8ba0e24cfd37aa59f684b676a15 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 May 2023 15:47:09 +0200 Subject: [PATCH 11/37] fix: RabbitMQ field not marked as required This commit makes sure that the RabbitMQ password filed is marked as required. This ensures that the user provides a password and that the bridge does not throw a function clause exception if the password filed is not set. Fixes: https://emqx.atlassian.net/browse/EMQX-9974 --- .../src/emqx_bridge_rabbitmq.app.src | 2 +- .../src/emqx_bridge_rabbitmq_connector.erl | 2 +- apps/emqx_connector/src/emqx_connector_schema_lib.erl | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index 36f47aaf6..2b572a98c 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, ecql, rabbit_common, amqp_client]}, {env, []}, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 6f833d659..3e809d99c 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -72,7 +72,7 @@ fields(config) -> desc => ?DESC("username") } )}, - {password, fun emqx_connector_schema_lib:password/1}, + {password, fun emqx_connector_schema_lib:password_required/1}, {pool_size, hoconsc:mk( typerefl:pos_integer(), diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index f64208311..a277fe8c8 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -30,6 +30,7 @@ database/1, username/1, password/1, + password_required/1, auto_reconnect/1 ]). @@ -104,6 +105,14 @@ password(sensitive) -> true; password(converter) -> fun emqx_schema:password_converter/2; password(_) -> undefined. +password_required(type) -> binary(); +password_required(desc) -> ?DESC("password"); +password_required(required) -> true; +password_required(format) -> <<"password">>; +password_required(sensitive) -> true; +password_required(converter) -> fun emqx_schema:password_converter/2; +password_required(_) -> undefined. + auto_reconnect(type) -> boolean(); auto_reconnect(desc) -> ?DESC("auto_reconnect"); auto_reconnect(default) -> true; From 63180c87be875e651c3781b742715cbb4c2d9a16 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 May 2023 18:53:14 +0200 Subject: [PATCH 12/37] style: simplify code for better readability Co-authored-by: Thales Macedo Garitezi --- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 9014fde0b..8331e715f 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -277,9 +277,7 @@ convert_float(undefined) -> null. make_iotdb_insert_request(MessageUnparsedPayload, State) -> - PayloadUnparsed = maps:get(payload, MessageUnparsedPayload), - PayloadParsed = make_parsed_payload(PayloadUnparsed), - Message = MessageUnparsedPayload#{payload => PayloadParsed}, + Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload), IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X), From cb14a3e08b051fe291fd71e8b12da84aaaba344a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 23 May 2023 19:58:05 +0300 Subject: [PATCH 13/37] fix(ft): handle empty filepath in fs exporter API Fixes EMQX-9973 --- .../src/emqx_ft_storage_exporter_fs_api.erl | 2 +- apps/emqx_ft/test/emqx_ft_api_SUITE.erl | 28 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl index abb774f82..40944c0e8 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs_api.erl @@ -167,7 +167,7 @@ parse_filepath(PathBin) -> throw({invalid, PathBin}) end, PathComponents = filename:split(PathBin), - case lists:any(fun is_special_component/1, PathComponents) of + case PathComponents == [] orelse lists:any(fun is_special_component/1, PathComponents) of false -> filename:join(PathComponents); true -> diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index f69e13a6d..2988e0083 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -140,10 +140,7 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{ - fileref => FileId, - node => <<"nonode@nohost">> - }) + query(#{fileref => FileId, node => <<"nonode@nohost">>}) ) ), @@ -152,10 +149,25 @@ t_download_transfer(Config) -> request( get, uri(["file_transfer", "file"]) ++ - query(#{ - fileref => <<"unknown_file">>, - node => node() - }) + query(#{fileref => <<"unknown_file">>, node => node()}) + ) + ), + + ?assertMatch( + {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}}, + request_json( + get, + uri(["file_transfer", "file"]) ++ + query(#{fileref => <<>>, node => node()}) + ) + ), + + ?assertMatch( + {ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}}, + request_json( + get, + uri(["file_transfer", "file"]) ++ + query(#{fileref => <<"/etc/passwd">>, node => node()}) ) ), From 2dbf84479c14a96991a6dd98500e6761cef14c20 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 23 May 2023 20:00:00 +0300 Subject: [PATCH 14/37] fix(ft): handle wider class of jiffy decode errors With malformed cursors in the File listing API. Fixes EMQX-9965 --- apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl | 2 +- apps/emqx_ft/test/emqx_ft_api_SUITE.erl | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 6738d6fef..702bc35ce 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -422,7 +422,7 @@ decode_cursor(Cursor) -> true = is_list(Name), {Node, #{transfer => {ClientId, FileId}, name => Name}} catch - error:{_, invalid_json} -> + error:{Loc, JsonError} when is_integer(Loc), is_atom(JsonError) -> error({badarg, cursor}); error:{badmatch, _} -> error({badarg, cursor}); diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index 2988e0083..18a8e9841 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -216,6 +216,16 @@ t_list_files_paging(Config) -> request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0})) ), + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>})) + ), + + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>})) + ), + ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request_json( From 25437adf02d23eae9c09eb7fb3324d6289bcf598 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 14:06:40 -0300 Subject: [PATCH 15/37] fix(webhook): fix empty ehttpc worker list clause on health check (r5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9970 --- apps/emqx_connector/src/emqx_connector_http.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index bb822a60a..4a9cb345f 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -416,7 +416,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of ok -> - {connected, State}; + connected; + {error, still_connecting} -> + connecting; {error, Reason} -> {disconnected, State, Reason} end. @@ -438,7 +440,8 @@ do_get_status(PoolName, Timeout) -> end end, try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of - % we crash in case of non-empty lists since we don't know what to do in that case + [] -> + {error, still_connecting}; [_ | _] = Results -> case [E || {error, _} = E <- Results] of [] -> From f4047d3946aac38d0e38986c2db025e1fc039090 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 23 May 2023 20:46:47 +0300 Subject: [PATCH 16/37] test(ft): add testcase for nasty tranfer filenames * Emoji * Chinese characters --- apps/emqx_ft/test/emqx_ft_SUITE.erl | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 7d64f9716..e582db01f 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -47,6 +47,7 @@ groups() -> t_invalid_topic_format, t_meta_conflict, t_nasty_clientids_fileids, + t_nasty_filenames, t_no_meta, t_no_segment, t_simple_transfer @@ -205,10 +206,6 @@ t_invalid_filename(Config) -> encode_meta(meta(lists:duplicate(1000, $A), <<>>)), 1 ) - ), - ?assertRCName( - success, - emqtt:publish(C, mk_init_topic(<<"f5">>), encode_meta(meta("146%", <<>>)), 1) ). t_simple_transfer(Config) -> @@ -265,6 +262,22 @@ t_nasty_clientids_fileids(_Config) -> Transfers ). +t_nasty_filenames(_Config) -> + Filenames = [ + {<<"nasty1">>, "146%"}, + {<<"nasty2">>, "🌚"}, + {<<"nasty3">>, "中文.txt"} + ], + ok = lists:foreach( + fun({ClientId, Filename}) -> + FileId = unicode:characters_to_binary(Filename), + ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId), + [Export] = list_files(ClientId), + ?assertEqual({ok, FileId}, read_export(Export)) + end, + Filenames + ). + t_meta_conflict(Config) -> C = ?config(client, Config), From b5f63f4151d69817b942757e173bda56aba5999a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 14:50:19 -0300 Subject: [PATCH 17/37] test: fix wrong test listeners option --- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 49ddd19bd..86f088855 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -161,7 +161,7 @@ init_node(Type) -> primary -> ok = emqx_config:put( [dashboard, listeners], - #{http => #{enable => true, bind => 18083}, proxy_header => false} + #{http => #{enable => true, bind => 18083, proxy_header => false}} ), ok = emqx_dashboard:start_listeners(), ready = emqx_dashboard_listener:regenerate_minirest_dispatch(), From 44ed6a19ee1f41f23d130ec674bb13ca9a51daf6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 11:22:48 -0300 Subject: [PATCH 18/37] fix(bridge): pass resource option `request_timeout = infinity` along to buffer workers (r5.0) --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3aade0369..1fbd6902e 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -211,7 +211,7 @@ send_message(BridgeId, Message) -> query_opts(Config) -> case emqx_utils_maps:deep_get([resource_opts, request_timeout], Config, false) of - Timeout when is_integer(Timeout) -> + Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> %% request_timeout is configured #{timeout => Timeout}; _ -> From dee21d2ccf7fbcba0bc5b85c95fc0d586ba820c0 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 8 May 2023 10:55:16 +0200 Subject: [PATCH 19/37] build: order _build/$PROFILE/lib before 'default' profile libs --- build | 29 +++++++++++++++-------------- dev | 2 +- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/build b/build index 5e396fdd2..4dd3071a1 100755 --- a/build +++ b/build @@ -91,19 +91,18 @@ log() { echo "===< $msg" } +prepare_erl_libs() { + local libs_dir="$1" + local erl_libs="${ERL_LIBS:-}" + for app in "${libs_dir}"/*; do + if [ -d "${app}/ebin" ]; then + erl_libs="${erl_libs}:${app}" + fi + done + export ERL_LIBS="$erl_libs" +} + make_docs() { - local libs_dir1 libs_dir2 libs_dir3 docdir - libs_dir1="$("$FIND" "_build/$PROFILE/lib/" -maxdepth 2 -name ebin -type d)" - if [ -d "_build/default/lib/" ]; then - libs_dir2="$("$FIND" "_build/default/lib/" -maxdepth 2 -name ebin -type d)" - else - libs_dir2='' - fi - if [ -d "_build/$PROFILE/checkouts" ]; then - libs_dir3="$("$FIND" "_build/$PROFILE/checkouts/" -maxdepth 2 -name ebin -type d 2>/dev/null || true)" - else - libs_dir3='' - fi case "$(is_enterprise "$PROFILE")" in 'yes') SCHEMA_MODULE='emqx_enterprise_schema' @@ -112,10 +111,12 @@ make_docs() { SCHEMA_MODULE='emqx_conf_schema' ;; esac - docdir="_build/docgen/$PROFILE" + prepare_erl_libs "_build/$PROFILE/checkouts" + prepare_erl_libs "_build/$PROFILE/lib" + local docdir="_build/docgen/$PROFILE" mkdir -p "$docdir" # shellcheck disable=SC2086 - erl -noshell -pa $libs_dir1 $libs_dir2 $libs_dir3 -eval \ + erl -noshell -eval \ "ok = emqx_conf:dump_schema('$docdir', $SCHEMA_MODULE), \ halt(0)." } diff --git a/dev b/dev index 8cf07cfaf..01bee1269 100755 --- a/dev +++ b/dev @@ -157,7 +157,7 @@ fi prepare_erl_libs() { local profile="$1" local libs_dir="_build/${profile}/lib" - local erl_libs='' + local erl_libs="${ERL_LIBS:-}" if [ $FORCE_COMPILE -eq 1 ] || [ ! -d "$libs_dir" ]; then make "compile-${PROFILE}" else From 8e6da40a311d3e738dbd73d564c2dff8e3af1552 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 17:51:02 -0300 Subject: [PATCH 20/37] test: fix flaky ocsp test setup (v5.0) --- apps/emqx/test/emqx_ocsp_cache_SUITE.erl | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index b0ba4f0e2..8bf965cc3 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -165,6 +165,7 @@ init_per_testcase(_TestCase, Config) -> {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}} end ), + snabbkaffe:start_trace(), _Heir = spawn_dummy_heir(), {ok, CachePid} = emqx_ocsp_cache:start_link(), DataDir = ?config(data_dir, Config), @@ -187,7 +188,6 @@ init_per_testcase(_TestCase, Config) -> ConfBin = emqx_utils_maps:binary_key_map(Conf), hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), - snabbkaffe:start_trace(), [ {cache_pid, CachePid} | Config @@ -231,12 +231,19 @@ end_per_testcase(_TestCase, Config) -> %% In some tests, we don't start the full supervision tree, so we need %% this dummy process. spawn_dummy_heir() -> - spawn_link(fun() -> - true = register(emqx_kernel_sup, self()), - receive - stop -> ok - end - end). + {_, {ok, _}} = + ?wait_async_action( + spawn_link(fun() -> + true = register(emqx_kernel_sup, self()), + ?tp(heir_name_registered, #{}), + receive + stop -> ok + end + end), + #{?snk_kind := heir_name_registered}, + 1_000 + ), + ok. does_module_exist(Mod) -> case erlang:module_loaded(Mod) of From 7da0860d6c6c3e94f7c327d1c2ef1da776a64447 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Wed, 24 May 2023 09:40:11 +0800 Subject: [PATCH 21/37] chore: upgrade minirest to 1.3.10 for more clear error msg --- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 3 ++- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 49ddd19bd..30a888118 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -159,9 +159,10 @@ init_node(Type) -> ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1), case Type of primary -> + ok = emqx_dashboard_desc_cache:init(), ok = emqx_config:put( [dashboard, listeners], - #{http => #{enable => true, bind => 18083}, proxy_header => false} + #{http => #{enable => true, bind => 18083, proxy_header => false}} ), ok = emqx_dashboard:start_listeners(), ready = emqx_dashboard_listener:regenerate_minirest_dispatch(), diff --git a/mix.exs b/mix.exs index 565f4b259..569acd720 100644 --- a/mix.exs +++ b/mix.exs @@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do {:ekka, github: "emqx/ekka", tag: "0.15.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, - {:minirest, github: "emqx/minirest", tag: "1.3.9", override: true}, + {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, diff --git a/rebar.config b/rebar.config index 4b4838f61..7d66529f5 100644 --- a/rebar.config +++ b/rebar.config @@ -65,7 +65,7 @@ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} From 492a5912983236eb0107c9d666c54c4f6418e334 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 10:53:16 +0200 Subject: [PATCH 22/37] refactor: do not emit debug logs on periodic license checks --- lib-ee/emqx_license/src/emqx_license.erl | 1 + lib-ee/emqx_license/src/emqx_license_checker.erl | 2 +- lib-ee/emqx_license/src/emqx_license_installer.erl | 6 +++--- lib-ee/emqx_license/src/emqx_license_resources.erl | 2 +- lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl | 4 ++-- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_license/src/emqx_license.erl b/lib-ee/emqx_license/src/emqx_license.erl index ef285b937..3e29dcf25 100644 --- a/lib-ee/emqx_license/src/emqx_license.erl +++ b/lib-ee/emqx_license/src/emqx_license.erl @@ -1,6 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- + -module(emqx_license). -include("emqx_license.hrl"). diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index 2ebb96004..da777ff84 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -117,7 +117,7 @@ handle_cast(_Msg, State) -> handle_info(check_license, #{license := License} = State) -> #{} = check_license(License), NewState = ensure_check_license_timer(State), - ?tp(debug, emqx_license_checked, #{}), + ?tp(emqx_license_checked, #{}), {noreply, NewState}; handle_info(check_expiry_alarm, #{license := License} = State) -> ok = expiry_early_alarm(License), diff --git a/lib-ee/emqx_license/src/emqx_license_installer.erl b/lib-ee/emqx_license/src/emqx_license_installer.erl index 58ee6ebcc..61076df4a 100644 --- a/lib-ee/emqx_license/src/emqx_license_installer.erl +++ b/lib-ee/emqx_license/src/emqx_license_installer.erl @@ -74,13 +74,13 @@ ensure_timer(#{interval := Interval} = State) -> check_pid(#{name := Name, pid := OldPid, callback := Callback} = State) -> case whereis(Name) of undefined -> - ?tp(debug, emqx_license_installer_noproc, #{old_pid => OldPid}), + ?tp(emqx_license_installer_noproc, #{old_pid => OldPid}), State; OldPid -> - ?tp(debug, emqx_license_installer_nochange, #{old_pid => OldPid}), + ?tp(emqx_license_installer_nochange, #{old_pid => OldPid}), State; NewPid -> _ = Callback(), - ?tp(debug, emqx_license_installer_called, #{old_pid => OldPid}), + ?tp(info, license_reloaded_after_emqx_app_restart, #{old_pid => OldPid}), State#{pid => NewPid} end. diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl index 2cc62b8a3..9a63e5e06 100644 --- a/lib-ee/emqx_license/src/emqx_license_resources.erl +++ b/lib-ee/emqx_license/src/emqx_license_resources.erl @@ -76,7 +76,7 @@ handle_cast(_Msg, State) -> handle_info(update_resources, State) -> true = update_resources(), connection_quota_early_alarm(), - ?tp(debug, emqx_license_resources_updated, #{}), + ?tp(emqx_license_resources_updated, #{}), {noreply, ensure_timer(State)}. terminate(_Reason, _State) -> diff --git a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl b/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl index 5d5e27489..21de8b357 100644 --- a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl +++ b/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl @@ -74,12 +74,12 @@ t_update(_Config) -> Pid1 = spawn_link(fun() -> timer:sleep(100) end), register(installer_test, Pid1) end, - #{?snk_kind := emqx_license_installer_called}, + #{?snk_kind := license_reloaded_after_emqx_app_restart}, 1000 ) end, fun(Trace) -> - ?assertMatch([_ | _], ?of_kind(emqx_license_installer_called, Trace)) + ?assertMatch([_ | _], ?of_kind(license_reloaded_after_emqx_app_restart, Trace)) end ). From 798fa8c2c2d6b998d31081f02cf3e8ef2fa3ca0b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 10:55:16 +0200 Subject: [PATCH 23/37] refactor: delete module emqx_license_installer after the previous refactoring, emqx_license app is now restarted after join/rejoin the cluster, so there is no longer a need for the installer process which monitors the 'emqx' name registration changes and then issue license reloading and hook re-adding etc. --- .../src/emqx_license_installer.erl | 86 ------------------ lib-ee/emqx_license/src/emqx_license_sup.erl | 9 -- .../test/emqx_license_installer_SUITE.erl | 89 ------------------- 3 files changed, 184 deletions(-) delete mode 100644 lib-ee/emqx_license/src/emqx_license_installer.erl delete mode 100644 lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl diff --git a/lib-ee/emqx_license/src/emqx_license_installer.erl b/lib-ee/emqx_license/src/emqx_license_installer.erl deleted file mode 100644 index 61076df4a..000000000 --- a/lib-ee/emqx_license/src/emqx_license_installer.erl +++ /dev/null @@ -1,86 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_license_installer). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - --behaviour(gen_server). - --export([ - start_link/1, - start_link/4 -]). - -%% gen_server callbacks --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2 -]). - --define(NAME, emqx). --define(INTERVAL, 5000). - -%%------------------------------------------------------------------------------ -%% API -%%------------------------------------------------------------------------------ - -start_link(Callback) -> - start_link(?NAME, ?MODULE, ?INTERVAL, Callback). - -start_link(Name, ServerName, Interval, Callback) -> - gen_server:start_link({local, ServerName}, ?MODULE, [Name, Interval, Callback], []). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Name, Interval, Callback]) -> - Pid = whereis(Name), - State = #{ - interval => Interval, - name => Name, - pid => Pid, - callback => Callback - }, - {ok, ensure_timer(State)}. - -handle_call(_Req, _From, State) -> - {reply, unknown, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({timeout, Timer, check_pid}, #{timer := Timer} = State) -> - NewState = check_pid(State), - {noreply, ensure_timer(NewState)}; -handle_info(_Msg, State) -> - {noreply, State}. - -%%------------------------------------------------------------------------------ -%% Private functions -%%------------------------------------------------------------------------------ - -ensure_timer(#{interval := Interval} = State) -> - _ = - case State of - #{timer := Timer} -> erlang:cancel_timer(Timer); - _ -> ok - end, - State#{timer => erlang:start_timer(Interval, self(), check_pid)}. - -check_pid(#{name := Name, pid := OldPid, callback := Callback} = State) -> - case whereis(Name) of - undefined -> - ?tp(emqx_license_installer_noproc, #{old_pid => OldPid}), - State; - OldPid -> - ?tp(emqx_license_installer_nochange, #{old_pid => OldPid}), - State; - NewPid -> - _ = Callback(), - ?tp(info, license_reloaded_after_emqx_app_restart, #{old_pid => OldPid}), - State#{pid => NewPid} - end. diff --git a/lib-ee/emqx_license/src/emqx_license_sup.erl b/lib-ee/emqx_license/src/emqx_license_sup.erl index 6b8f73953..304fac313 100644 --- a/lib-ee/emqx_license/src/emqx_license_sup.erl +++ b/lib-ee/emqx_license/src/emqx_license_sup.erl @@ -41,15 +41,6 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_license_resources] - }, - - #{ - id => license_installer, - start => {emqx_license_installer, start_link, [fun emqx_license:load/0]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_license_installer] } ] }}. diff --git a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl b/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl deleted file mode 100644 index 21de8b357..000000000 --- a/lib-ee/emqx_license/test/emqx_license_installer_SUITE.erl +++ /dev/null @@ -1,89 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_license_installer_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - _ = application:load(emqx_conf), - emqx_common_test_helpers:start_apps([emqx_license], fun set_special_configs/1), - Config. - -end_per_suite(_) -> - emqx_common_test_helpers:stop_apps([emqx_license]), - ok. - -init_per_testcase(_Case, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - Config. - -end_per_testcase(_Case, _Config) -> - ok. - -set_special_configs(emqx_license) -> - Config = #{key => emqx_license_test_lib:default_license()}, - emqx_config:put([license], Config); -set_special_configs(_) -> - ok. - -%%------------------------------------------------------------------------------ -%% Tests -%%------------------------------------------------------------------------------ - -t_update(_Config) -> - ?check_trace( - begin - ?wait_async_action( - begin - Pid0 = spawn_link(fun() -> - receive - exit -> ok - end - end), - register(installer_test, Pid0), - - {ok, _} = emqx_license_installer:start_link( - installer_test, - ?MODULE, - 10, - fun() -> ok end - ), - - {ok, _} = ?block_until( - #{?snk_kind := emqx_license_installer_nochange}, - 100 - ), - - Pid0 ! exit, - - {ok, _} = ?block_until( - #{?snk_kind := emqx_license_installer_noproc}, - 100 - ), - - Pid1 = spawn_link(fun() -> timer:sleep(100) end), - register(installer_test, Pid1) - end, - #{?snk_kind := license_reloaded_after_emqx_app_restart}, - 1000 - ) - end, - fun(Trace) -> - ?assertMatch([_ | _], ?of_kind(license_reloaded_after_emqx_app_restart, Trace)) - end - ). - -t_unknown_calls(_Config) -> - ok = gen_server:cast(emqx_license_installer, some_cast), - some_msg = erlang:send(emqx_license_installer, some_msg), - ?assertEqual(unknown, gen_server:call(emqx_license_installer, some_request)). From e38645aa282064073fe48bcc39e3bbd77d860d29 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 May 2023 17:13:18 +0800 Subject: [PATCH 24/37] fix: lookup topic without force percent decode * the minirest handler would do it --- apps/emqx_management/src/emqx_mgmt_api_topics.erl | 4 ++-- changes/ce/fix-10801.en.md | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changes/ce/fix-10801.en.md diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 6b0e1f622..d451261ff 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -139,9 +139,9 @@ lookup(#{topic := Topic}) -> %%%============================================================================================== %% internal generate_topic(Params = #{<<"topic">> := Topic}) -> - Params#{<<"topic">> => uri_string:percent_decode(Topic)}; + Params#{<<"topic">> => Topic}; generate_topic(Params = #{topic := Topic}) -> - Params#{topic => uri_string:percent_decode(Topic)}; + Params#{topic => Topic}; generate_topic(Params) -> Params. diff --git a/changes/ce/fix-10801.en.md b/changes/ce/fix-10801.en.md new file mode 100644 index 000000000..4c36bd528 --- /dev/null +++ b/changes/ce/fix-10801.en.md @@ -0,0 +1 @@ +Avoid duplicated percent decode the topic name in API `/topics/{topic}` and `/topics`. From 1ac140312d48a3b1248abcf595b339e26475075e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 24 May 2023 17:39:25 +0800 Subject: [PATCH 25/37] test: lookup topic with percent encoded topic name --- .../test/emqx_mgmt_api_topics_SUITE.erl | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 659ae0d44..e617c6dcb 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -92,4 +92,35 @@ t_nodes_api(Config) -> #{<<"topic">> := Topic, <<"node">> := Node2} ] = emqx_utils_json:decode(RouteResponse, [return_maps]), - ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])). + ?assertEqual(lists:usort([Node, atom_to_binary(Slave)]), lists:usort([Node1, Node2])), + + ok = emqtt:stop(Client). + +t_percent_topics(_Config) -> + Node = atom_to_binary(node(), utf8), + Topic = <<"test_%%1">>, + {ok, Client} = emqtt:start_link(#{ + username => <<"routes_username">>, clientid => <<"routes_cid">> + }), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic), + + %% exact match with percent encoded topic + Path = emqx_mgmt_api_test_util:api_path(["topics"]), + QS = uri_string:compose_query([ + {"topic", Topic}, + {"node", atom_to_list(node())} + ]), + Headers = emqx_mgmt_api_test_util:auth_header_(), + {ok, MatchResponse} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), + MatchData = emqx_utils_json:decode(MatchResponse, [return_maps]), + ?assertMatch( + #{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100}, + maps:get(<<"meta">>, MatchData) + ), + ?assertMatch( + [#{<<"topic">> := Topic, <<"node">> := Node}], + maps:get(<<"data">>, MatchData) + ), + + ok = emqtt:stop(Client). From 6c414ab991825fb0ecd442bcf60fca4274da7a9c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 23 May 2023 16:25:27 -0300 Subject: [PATCH 26/37] fix(schema_registry): ensure `schema_encode` output in rule engine is a binary (r5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9981 --- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 6 +++++- .../test/emqx_ee_schema_registry_SUITE.erl | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 47017f718..73e2f78e7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1115,7 +1115,11 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> '$handle_undefined_function'(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> - emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs); + %% encode outputs iolists, but when the rule actions process those + %% it might wrongly encode them as JSON lists, so we force them to + %% binaries here. + IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs), + iolist_to_binary(IOList); '$handle_undefined_function'(schema_encode, Args) -> error({args_count_error, {schema_encode, Args}}); '$handle_undefined_function'(sprintf, [Format | Args]) -> diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 7ad01fa06..1f53766e3 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -82,8 +82,12 @@ make_trace_fn_action() -> #{function => Fn, args => #{}}. create_rule_http(RuleParams) -> + create_rule_http(RuleParams, _Overrides = #{}). + +create_rule_http(RuleParams, Overrides) -> RepublishTopic = <<"republish/schema_registry">>, emqx:subscribe(RepublishTopic), + PayloadTemplate = maps:get(payload_template, Overrides, <<>>), DefaultParams = #{ enable => true, actions => [ @@ -93,7 +97,7 @@ create_rule_http(RuleParams) -> <<"args">> => #{ <<"topic">> => RepublishTopic, - <<"payload">> => <<>>, + <<"payload">> => PayloadTemplate, <<"qos">> => 0, <<"retain">> => false, <<"user_properties">> => <<>> @@ -177,10 +181,12 @@ test_params_for(avro, encode1) -> "from t\n" >>, Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(avro, decode1) -> @@ -251,10 +257,12 @@ test_params_for(protobuf, encode1) -> "from t\n" >>, Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + PayloadTemplate = <<"${.encoded}">>, ExtraArgs = [<<"Person">>], #{ sql => SQL, payload => Payload, + payload_template => PayloadTemplate, extra_args => ExtraArgs }; test_params_for(protobuf, union1) -> @@ -487,17 +495,18 @@ t_encode(Config) -> #{ sql := SQL, payload := Payload, + payload_template := PayloadTemplate, extra_args := ExtraArgs } = test_params_for(SerdeType, encode1), - {ok, _} = create_rule_http(#{sql => SQL}), + {ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}), PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Published = receive_published(?LINE), ?assertMatch( - #{payload := #{<<"encoded">> := _}}, + #{payload := P} when is_binary(P), Published ), - #{payload := #{<<"encoded">> := Encoded}} = Published, + #{payload := Encoded} = Published, {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])), ok. From ae2398defe62fc622f17a3d76e6e4b81a136ec5b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 15:00:53 +0200 Subject: [PATCH 27/37] docs: add changelogs --- changes/ee/fix-10807.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/fix-10807.en.md diff --git a/changes/ee/fix-10807.en.md b/changes/ee/fix-10807.en.md new file mode 100644 index 000000000..8cd5da0c8 --- /dev/null +++ b/changes/ee/fix-10807.en.md @@ -0,0 +1 @@ +Removed license check debug logs. From 8816da41a6d51b7a46ee3057c05ced8d2353a1b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=90=E6=96=87?= Date: Wed, 24 May 2023 21:06:15 +0800 Subject: [PATCH 28/37] test: log test failed --- .../emqx_conf/test/emqx_conf_schema_tests.erl | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index 0f0cb3de1..32c66fb90 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -444,7 +444,7 @@ log_path_test_() -> #{<<"log">> => #{<<"file_handlers">> => #{<<"name1">> => #{<<"file">> => Path}}}} end, Assert = fun(Name, Path, Conf) -> - ?assertMatch(#{log := #{file_handlers := #{Name := #{file := Path}}}}, Conf) + ?assertMatch(#{log := #{file := #{Name := #{to := Path}}}}, Conf) end, [ @@ -457,7 +457,15 @@ log_path_test_() -> {emqx_conf_schema, [ #{ kind := validation_error, - reason := {"bad_file_path_string", _} + mismatches := + #{ + "handler_name" := + #{ + kind := validation_error, + path := "log.file.name1.to", + reason := {"bad_file_path_string", _} + } + } } ]}, check(Fh(<<239, 32, 132, 47, 117, 116, 102, 56>>)) @@ -468,7 +476,15 @@ log_path_test_() -> {emqx_conf_schema, [ #{ kind := validation_error, - reason := {"not_string", _} + mismatches := + #{ + "handler_name" := + #{ + kind := validation_error, + path := "log.file.name1.to", + reason := {"not_string", _} + } + } } ]}, check(Fh(#{<<"foo">> => <<"bar">>})) From adf71b905ee732e208c9c6c39f4f985425e226b2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 15:10:15 +0200 Subject: [PATCH 29/37] chore: bump emqx_license app vsn --- lib-ee/emqx_license/src/emqx_license.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_license/src/emqx_license.app.src b/lib-ee/emqx_license/src/emqx_license.app.src index fcdcbc05b..354385faf 100644 --- a/lib-ee/emqx_license/src/emqx_license.app.src +++ b/lib-ee/emqx_license/src/emqx_license.app.src @@ -1,6 +1,6 @@ {application, emqx_license, [ {description, "EMQX License"}, - {vsn, "5.0.9"}, + {vsn, "5.0.10"}, {modules, []}, {registered, [emqx_license_sup]}, {applications, [kernel, stdlib, emqx_ctl]}, From 4565acc600f96ad1387e2580323032e1b3e73f43 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 10:24:21 -0300 Subject: [PATCH 30/37] fix: handle `infinity` timeout option in `ehttpc` (r5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9987 --- .../test/emqx_bridge_gcp_pubsub_SUITE.erl | 24 +++++++++++++++++++ mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index 55527bf1f..3945b2eaf 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -631,6 +631,30 @@ t_publish_success(Config) -> ), ok. +t_publish_success_infinity_timeout(Config) -> + ServiceAccountJSON = ?config(service_account_json, Config), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config, #{ + <<"resource_opts">> => #{<<"request_timeout">> => <<"infinity">>} + }), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"topic">> := Topic, + <<"payload">> := Payload, + <<"metadata">> := #{<<"rule_id">> := RuleId} + } + ], + DecodedMessages + ), + ok. + t_publish_success_local_topic(Config) -> ResourceId = ?config(resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), diff --git a/mix.exs b/mix.exs index 1c50970ea..52b49f246 100644 --- a/mix.exs +++ b/mix.exs @@ -49,7 +49,7 @@ defmodule EMQXUmbrella.MixProject do {:redbug, "2.0.8"}, {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.4.8", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true}, {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index bb6bb87fe..bceac3005 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {gpb, "4.19.7"} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.8"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}} , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} From 3d3dacfcf672dbefc5bc6ef77512803634cfcb59 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 24 May 2023 15:38:20 +0200 Subject: [PATCH 31/37] chore(ekka): Bump version to 0.15.2 --- apps/emqx/rebar.config | 2 +- changes/ce/fix-10809.en.md | 2 ++ mix.exs | 2 +- rebar.config | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10809.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 97a0c0f31..8ba36a87b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.6"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, diff --git a/changes/ce/fix-10809.en.md b/changes/ce/fix-10809.en.md new file mode 100644 index 000000000..e7e15e5ca --- /dev/null +++ b/changes/ce/fix-10809.en.md @@ -0,0 +1,2 @@ +Address `** ERROR ** Mnesia post_commit hook failed: error:badarg` error messages happening during node shutdown or restart. +Mria pull request: https://github.com/emqx/mria/pull/142 diff --git a/mix.exs b/mix.exs index 565f4b259..7a8b16785 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true}, - {:ekka, github: "emqx/ekka", tag: "0.15.1", override: true}, + {:ekka, github: "emqx/ekka", tag: "0.15.2", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.9", override: true}, diff --git a/rebar.config b/rebar.config index 4b4838f61..0acd04de7 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.1"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.9"}}} From fd2940cd77b234005363ad9f8e4a1a442f514acd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 22 May 2023 17:00:30 -0300 Subject: [PATCH 32/37] feat(pulsar): ensure allocated resources are removed on failures (v5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9937 --- .../src/emqx_bridge_pulsar_impl_producer.erl | 35 ++++-- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 107 ++++++++++++++++-- apps/emqx_resource/include/emqx_resource.hrl | 2 + apps/emqx_resource/src/emqx_resource.erl | 52 ++++++++- .../src/emqx_resource_manager.erl | 4 +- .../src/emqx_resource_manager_sup.erl | 8 ++ changes/ee/feat-10778.en.md | 1 + 7 files changed, 187 insertions(+), 22 deletions(-) create mode 100644 changes/ee/feat-10778.en.md diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 5ed706511..5906cc57a 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -81,6 +81,7 @@ on_start(InstanceId, Config) -> } = Config, Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), + ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ @@ -116,15 +117,29 @@ on_start(InstanceId, Config) -> start_producer(Config, InstanceId, ClientId, ClientOpts). -spec on_stop(resource_id(), state()) -> ok. -on_stop(_InstanceId, State) -> - #{ - pulsar_client_id := ClientId, - producers := Producers - } = State, - stop_producers(ClientId, Producers), - stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), - ok. +on_stop(InstanceId, _State) -> + case emqx_resource:get_allocated_resources(InstanceId) of + #{pulsar_client_id := ClientId, pulsar_producers := Producers} -> + stop_producers(ClientId, Producers), + stop_client(ClientId), + ?tp(pulsar_bridge_stopped, #{ + instance_id => InstanceId, + pulsar_client_id => ClientId, + pulsar_producers => Producers + }), + ok; + #{pulsar_client_id := ClientId} -> + stop_client(ClientId), + ?tp(pulsar_bridge_stopped, #{ + instance_id => InstanceId, + pulsar_client_id => ClientId, + pulsar_producers => undefined + }), + ok; + _ -> + ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}), + ok + end. -spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(_InstanceId, State = #{}) -> @@ -325,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> + ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers), + ?tp(pulsar_producer_producers_allocated, #{}), State = #{ pulsar_client_id => ClientId, producers => Producers, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 76d9f94e1..3605baaab 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -43,7 +43,9 @@ only_once_tests() -> t_send_when_down, t_send_when_timeout, t_failure_to_start_producer, - t_producer_process_crash + t_producer_process_crash, + t_resource_manager_crash_after_producers_started, + t_resource_manager_crash_before_producers_started ]. init_per_suite(Config) -> @@ -429,7 +431,19 @@ wait_until_producer_connected() -> wait_until_connected(pulsar_producers_sup, pulsar_producer). wait_until_connected(SupMod, Mod) -> - Pids = [ + Pids = get_pids(SupMod, Mod), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + ), + ok. + +get_pulsar_producers() -> + get_pids(pulsar_producers_sup, pulsar_producer). + +get_pids(SupMod, Mod) -> + [ P || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod), P <- element(2, process_info(SupPid, links)), @@ -437,13 +451,7 @@ wait_until_connected(SupMod, Mod) -> {Mod, init, _} -> true; _ -> false end - ], - ?retry( - _Sleep = 300, - _Attempts0 = 20, - lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) - ), - ok. + ]. create_rule_and_action_http(Config) -> PulsarName = ?config(pulsar_name, Config), @@ -528,6 +536,18 @@ start_cluster(Cluster) -> end), Nodes. +kill_resource_managers() -> + ct:pal("gonna kill resource managers"), + lists:foreach( + fun({_, Pid, _, _}) -> + ct:pal("terminating resource manager ~p", [Pid]), + %% sys:terminate(Pid, stop), + exit(Pid, kill), + ok + end, + supervisor:which_children(emqx_resource_manager_sup) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -921,7 +941,11 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, - ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?retry( + _Sleep0 = 50, + _Attempts0 = 50, + ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)) + ), %% Should recover given enough time. ?retry( _Sleep = 1_000, @@ -952,6 +976,69 @@ t_producer_process_crash(Config) -> ), ok. +t_resource_manager_crash_after_producers_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pulsar_producer_producers_allocated}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := pulsar_producer_bridge_started} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when + Producers =/= undefined, + 10_000 + ), + ok + end, + [] + ), + ok. + +t_resource_manager_crash_before_producers_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pulsar_producer_capture_name}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := pulsar_producer_about_to_start_producers} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, + 10_000 + ), + ok + end, + [] + ), + ok. + t_cluster(Config) -> MQTTTopic = ?config(mqtt_topic, Config), ResourceId = resource_id(Config), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 7f3ac580d..ce3ee73a9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -121,3 +121,5 @@ -define(TEST_ID_PREFIX, "_probe_:"). -define(RES_METRICS, resource_metrics). + +-define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 80f270b13..10f1de6c4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -79,7 +79,13 @@ query/2, query/3, %% query the instance without batching and queuing messages. - simple_sync_query/2 + simple_sync_query/2, + %% functions used by connectors to register resources that must be + %% freed when stopping or even when a resource manager crashes. + allocate_resource/3, + has_allocated_resources/1, + get_allocated_resources/1, + forget_allocated_resources/1 ]). %% Direct calls to the callback module @@ -372,6 +378,9 @@ is_buffer_supported(Module) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> try + %% If the previous manager process crashed without cleaning up + %% allocated resources, clean them up. + clean_allocated_resources(ResId, Mod), Mod:on_start(ResId, Config) catch throw:Error -> @@ -390,7 +399,16 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_stop(resource_id(), module(), resource_state()) -> term(). call_stop(ResId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)). + ?SAFE_CALL(begin + Res = Mod:on_stop(ResId, ResourceState), + case Res of + ok -> + emqx_resource:forget_allocated_resources(ResId); + _ -> + ok + end, + Res + end). -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. @@ -486,7 +504,37 @@ apply_reply_fun({F, A}, Result) when is_function(F) -> apply_reply_fun(From, Result) -> gen_server:reply(From, Result). +-spec allocate_resource(resource_id(), any(), term()) -> ok. +allocate_resource(InstanceId, Key, Value) -> + true = ets:insert(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, Value}), + ok. + +-spec has_allocated_resources(resource_id()) -> boolean(). +has_allocated_resources(InstanceId) -> + ets:member(?RESOURCE_ALLOCATION_TAB, InstanceId). + +-spec get_allocated_resources(resource_id()) -> map(). +get_allocated_resources(InstanceId) -> + Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId), + maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]). + +-spec forget_allocated_resources(resource_id()) -> ok. +forget_allocated_resources(InstanceId) -> + true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId), + ok. + %% ================================================================================= filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. + +clean_allocated_resources(ResourceId, ResourceMod) -> + case emqx_resource:has_allocated_resources(ResourceId) of + true -> + %% The resource entries in the ETS table are erased inside + %% `call_stop' if the call is successful. + ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined), + ok; + false -> + ok + end. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 97ac355f4..7a54bfa97 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -500,8 +500,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. - case ResState /= undefined of + HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), + case ResState =/= undefined orelse HasAllocatedResources of true -> + %% we clear the allocated resources after stop is successful emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState); false -> ok diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 73f1988c6..9e86e6363 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,6 +17,8 @@ -behaviour(supervisor). +-include("emqx_resource.hrl"). + -export([ensure_child/5, delete_child/1]). -export([start_link/0]). @@ -36,6 +38,12 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + %% Maps resource_id() to one or more allocated resources. + emqx_utils_ets:new(?RESOURCE_ALLOCATION_TAB, [ + bag, + public, + {read_concurrency, true} + ]), ChildSpecs = [ #{ id => emqx_resource_manager, diff --git a/changes/ee/feat-10778.en.md b/changes/ee/feat-10778.en.md new file mode 100644 index 000000000..3084d2959 --- /dev/null +++ b/changes/ee/feat-10778.en.md @@ -0,0 +1 @@ +Refactored Pulsar Producer bridge to avoid leaking resources during crashes. From 7f885218368b8f83539f4e700bfda10bcfc93a0f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 15:41:25 -0300 Subject: [PATCH 33/37] test(pgsql): reduce flakiness Depending on timing, `t_write_timeout` was getting stuck while checking the resource health, and the previous request timeout options were making a response to never be sent if that process took too long. --- .../test/emqx_bridge_pgsql_SUITE.erl | 27 ++++++++++++++----- apps/emqx_resource/src/emqx_resource_pool.erl | 5 ++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 9f2011779..e4f17d76a 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -258,13 +258,18 @@ query_resource(Config, Request) -> emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). query_resource_async(Config, Request) -> + query_resource_async(Config, Request, _Opts = #{}). + +query_resource_async(Config, Request, Opts) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Timeout = maps:get(timeout, Opts, 500), Return = emqx_resource:query(ResourceID, Request, #{ - timeout => 500, async_reply_fun => {AsyncReplyFun, []} + timeout => Timeout, + async_reply_fun => {AsyncReplyFun, []} }), {Return, Ref}. @@ -498,9 +503,9 @@ t_write_timeout(Config) -> Config, #{ <<"resource_opts">> => #{ - <<"request_timeout">> => 500, - <<"resume_interval">> => 100, - <<"health_check_interval">> => 100 + <<"auto_restart_interval">> => <<"100ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> } } ), @@ -515,7 +520,7 @@ t_write_timeout(Config) -> Res1 = case QueryMode of async -> - query_resource_async(Config, {send_message, SentData}); + query_resource_async(Config, {send_message, SentData}, #{timeout => 60_000}); sync -> query_resource(Config, {send_message, SentData}) end, @@ -526,7 +531,17 @@ t_write_timeout(Config) -> {_, Ref} when is_reference(Ref) -> case receive_result(Ref, 15_000) of {ok, Res} -> - ?assertMatch({error, {unrecoverable_error, _}}, Res); + %% we may receive a successful result depending on + %% timing, if the request is retried after the + %% failure is healed. + case Res of + {error, {unrecoverable_error, _}} -> + ok; + {ok, _} -> + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end; timeout -> ct:pal("mailbox:\n ~p", [process_info(self(), messages)]), ct:fail("no response received") diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index 913b29c86..ea2240efd 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -25,7 +25,12 @@ -include_lib("emqx/include/logger.hrl"). +-ifndef(TEST). -define(HEALTH_CHECK_TIMEOUT, 15000). +-else. +%% make tests faster +-define(HEALTH_CHECK_TIMEOUT, 1000). +-endif. start(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of From 382ecf9d5c96e131dc931daf89064aeea5158bba Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 21:23:04 +0200 Subject: [PATCH 34/37] build: adapt ERL_LIBS dir separator for windows Using ':' in ERL_LIBS environment variable e.g. ERL_LIBS='dir1:dir2' does not work in windows, it has to be ';' --- build | 12 +++++++++++- dev | 13 ++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/build b/build index 4dd3071a1..7fd171402 100755 --- a/build +++ b/build @@ -94,9 +94,19 @@ log() { prepare_erl_libs() { local libs_dir="$1" local erl_libs="${ERL_LIBS:-}" + local sep + if [ "${SYSTEM}" = 'windows' ]; then + sep=';' + else + sep=':' + fi for app in "${libs_dir}"/*; do if [ -d "${app}/ebin" ]; then - erl_libs="${erl_libs}:${app}" + if [ -n "$erl_libs" ]; then + erl_libs="${erl_libs}${sep}${app}" + else + erl_libs="${app}" + fi fi done export ERL_LIBS="$erl_libs" diff --git a/dev b/dev index 01bee1269..5087cc30f 100755 --- a/dev +++ b/dev @@ -58,6 +58,7 @@ fi export HOCON_ENV_OVERRIDE_PREFIX='EMQX_' export EMQX_LOG__FILE__DEFAULT__ENABLE='false' export EMQX_LOG__CONSOLE__ENABLE='true' +SYSTEM="$(./scripts/get-distro.sh)" EMQX_NODE_NAME="${EMQX_NODE_NAME:-emqx@127.0.0.1}" PROFILE="${PROFILE:-emqx}" FORCE_COMPILE=0 @@ -158,13 +159,23 @@ prepare_erl_libs() { local profile="$1" local libs_dir="_build/${profile}/lib" local erl_libs="${ERL_LIBS:-}" + local sep + if [ "${SYSTEM}" = 'windows' ]; then + sep=';' + else + sep=':' + fi if [ $FORCE_COMPILE -eq 1 ] || [ ! -d "$libs_dir" ]; then make "compile-${PROFILE}" else echo "Running from code in $libs_dir" fi for app in "${libs_dir}"/*; do - erl_libs="${erl_libs}:${app}" + if [ -n "$erl_libs" ]; then + erl_libs="${erl_libs}${sep}${app}" + else + erl_libs="${app}" + fi done export ERL_LIBS="$erl_libs" } From 658160f09a36a53d8d3e80279908144f461a5ece Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 15:43:07 -0300 Subject: [PATCH 35/37] test(cassandra): attempt to reduce flakiness --- .../test/emqx_bridge_cassandra_SUITE.erl | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 79220321e..8f093ef5c 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -506,7 +506,17 @@ t_write_failure(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), QueryMode = ?config(query_mode, Config), - {ok, _} = create_bridge(Config), + {ok, _} = create_bridge( + Config, + #{ + <<"resource_opts">> => + #{ + <<"auto_restart_interval">> => <<"100ms">>, + <<"resume_interval">> => <<"100ms">>, + <<"health_check_interval">> => <<"100ms">> + } + } + ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{ topic => atom_to_binary(?FUNCTION_NAME), @@ -523,7 +533,9 @@ t_write_failure(Config) -> async -> send_message(Config, SentData) end, - #{?snk_kind := buffer_worker_flush_nack}, + #{?snk_kind := Evt} when + Evt =:= buffer_worker_flush_nack orelse + Evt =:= buffer_worker_retry_inflight_failed, 10_000 ) end), From a1b8e2a42a296f1541b16bba62f934bb4bad2745 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 24 May 2023 22:14:27 +0200 Subject: [PATCH 36/37] ci: test windows build with a ping after start --- .github/workflows/build_slim_packages.yaml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 836eaf079..7e664c1c7 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -111,8 +111,14 @@ jobs: timeout-minutes: 5 run: | ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx start - Start-Sleep -s 5 - echo "EMQX started" + Start-Sleep -s 10 + $pingOutput = ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx ping + if ($pingOutput = 'pong') { + echo "EMQX started OK" + } else { + echo "Failed to ping EMQX $pingOutput" + Exit 1 + } ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx stop echo "EMQX stopped" ./_build/${{ matrix.profile }}/rel/emqx/bin/emqx install From 7eb445707fbdcbbb9f47d0e39c9a79ff51896917 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 May 2023 17:10:40 +0800 Subject: [PATCH 37/37] chore: fix schema rename --- rel/i18n/zh/emqx_schema.hocon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rel/i18n/zh/emqx_schema.hocon b/rel/i18n/zh/emqx_schema.hocon index 0e329eac9..4c8e1f81e 100644 --- a/rel/i18n/zh/emqx_schema.hocon +++ b/rel/i18n/zh/emqx_schema.hocon @@ -843,10 +843,10 @@ sysmon_vm_long_schedule.desc: sysmon_vm_long_schedule.label: """启用长调度监控""" -mqtt_keepalive_backoff.desc: +mqtt_keepalive_multiplier.desc: """EMQX 判定客户端保活超时使用的阈值系数。计算公式为:Keep Alive * Backoff * 2""" -mqtt_keepalive_backoff.label: +mqtt_keepalive_multiplier.label: """保活超时阈值系数""" force_gc_bytes.desc: