From 3a2ff344330e4539badfad8d0b0a7fd354d961e7 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 8 May 2024 11:43:10 +0200 Subject: [PATCH 01/10] chore: add zone in listener config example --- apps/emqx_management/src/emqx_mgmt_api_listeners.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 1c581514a..eebbaad08 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -810,6 +810,7 @@ listener_id_status_example() -> tcp_schema_example() -> #{ + type => tcp, acceptors => 16, access_rules => ["allow all"], bind => <<"0.0.0.0:1884">>, @@ -820,6 +821,7 @@ tcp_schema_example() -> proxy_protocol => false, proxy_protocol_timeout => <<"3s">>, running => true, + zone => default, tcp_options => #{ active_n => 100, backlog => 1024, @@ -829,8 +831,7 @@ tcp_schema_example() -> reuseaddr => true, send_timeout => <<"15s">>, send_timeout_close => true - }, - type => tcp + } }. create_listener(From, Body) -> From 916168c755876648bf9b5faa275b891a57a104de Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 8 May 2024 15:09:54 +0200 Subject: [PATCH 02/10] fix: make rule engine unescape convert \a to the terminal alarm char The rule engine unescape function should convert the escape sequence \a to the alarm bell symbol (ASCII 7). This bug was created as it was assumed that Erlang convert $\a to 7 but Erlang does not handle the \a escape sequence and instead convert it to 97 (ASCII code for a). Fixes: https://emqx.atlassian.net/browse/EMQX-12313 --- .../test/emqx_rule_funcs_SUITE.erl | 22 +++++++++---------- apps/emqx_utils/src/emqx_variform_bif.erl | 3 ++- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 537facd80..e260b04e1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -744,18 +744,18 @@ t_regex_replace(_) -> ?assertEqual(<<"a[cc]b[c]d">>, apply_func(regex_replace, [<<"accbcd">>, <<"c+">>, <<"[&]">>])). t_unescape(_) -> - ?assertEqual(<<"\n">>, emqx_rule_funcs:unescape(<<"\\n">>)), - ?assertEqual(<<"\t">>, emqx_rule_funcs:unescape(<<"\\t">>)), - ?assertEqual(<<"\r">>, emqx_rule_funcs:unescape(<<"\\r">>)), - ?assertEqual(<<"\b">>, emqx_rule_funcs:unescape(<<"\\b">>)), - ?assertEqual(<<"\f">>, emqx_rule_funcs:unescape(<<"\\f">>)), - ?assertEqual(<<"\v">>, emqx_rule_funcs:unescape(<<"\\v">>)), - ?assertEqual(<<"'">>, emqx_rule_funcs:unescape(<<"\\'">>)), - ?assertEqual(<<"\"">>, emqx_rule_funcs:unescape(<<"\\\"">>)), - ?assertEqual(<<"?">>, emqx_rule_funcs:unescape(<<"\\?">>)), - ?assertEqual(<<"\a">>, emqx_rule_funcs:unescape(<<"\\a">>)), + ?assertEqual(<<"\n">> = <<10>>, emqx_rule_funcs:unescape(<<"\\n">>)), + ?assertEqual(<<"\t">> = <<9>>, emqx_rule_funcs:unescape(<<"\\t">>)), + ?assertEqual(<<"\r">> = <<13>>, emqx_rule_funcs:unescape(<<"\\r">>)), + ?assertEqual(<<"\b">> = <<8>>, emqx_rule_funcs:unescape(<<"\\b">>)), + ?assertEqual(<<"\f">> = <<12>>, emqx_rule_funcs:unescape(<<"\\f">>)), + ?assertEqual(<<"\v">> = <<11>>, emqx_rule_funcs:unescape(<<"\\v">>)), + ?assertEqual(<<"'">> = <<39>>, emqx_rule_funcs:unescape(<<"\\'">>)), + ?assertEqual(<<"\"">> = <<34>>, emqx_rule_funcs:unescape(<<"\\\"">>)), + ?assertEqual(<<"?">> = <<63>>, emqx_rule_funcs:unescape(<<"\\?">>)), + ?assertEqual(<<7>>, emqx_rule_funcs:unescape(<<"\\a">>)), % Test escaping backslash itself - ?assertEqual(<<"\\">>, emqx_rule_funcs:unescape(<<"\\\\">>)), + ?assertEqual(<<"\\">> = <<92>>, emqx_rule_funcs:unescape(<<"\\\\">>)), % Test a string without any escape sequences ?assertEqual(<<"Hello, World!">>, emqx_rule_funcs:unescape(<<"Hello, World!">>)), % Test a string with escape sequences diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index 91bd4f9cf..ed9dfb851 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -289,7 +289,8 @@ unescape_string([$\\, $" | Rest], Acc) -> unescape_string([$\\, $? | Rest], Acc) -> unescape_string(Rest, [$\? | Acc]); unescape_string([$\\, $a | Rest], Acc) -> - unescape_string(Rest, [$\a | Acc]); + %% Terminal bell + unescape_string(Rest, [7 | Acc]); %% Start of HEX escape code unescape_string([$\\, $x | [$0 | _] = HexStringStart], Acc) -> unescape_handle_hex_string(HexStringStart, Acc); From 9edbad545968bc3a8a222663c5525af435079d47 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 8 May 2024 15:45:41 +0200 Subject: [PATCH 03/10] fix(listener_api): do not allow update listener with unknown zone name --- apps/emqx/src/config/emqx_config_zones.erl | 24 +++++++++++++++++ apps/emqx/src/emqx_listeners.erl | 8 +++++- .../test/emqx_mgmt_api_listeners_SUITE.erl | 26 +++++++++++++++++-- 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/config/emqx_config_zones.erl b/apps/emqx/src/config/emqx_config_zones.erl index f4f3c7420..9b2fb224a 100644 --- a/apps/emqx/src/config/emqx_config_zones.erl +++ b/apps/emqx/src/config/emqx_config_zones.erl @@ -20,6 +20,7 @@ %% API -export([add_handler/0, remove_handler/0, pre_config_update/3]). -export([is_olp_enabled/0]). +-export([assert_zone_exists/1]). -define(ZONES, [zones]). @@ -44,3 +45,26 @@ is_olp_enabled() -> false, emqx_config:get([zones], #{}) ). + +-spec assert_zone_exists(binary() | atom()) -> ok. +assert_zone_exists(Name0) when is_binary(Name0) -> + %% an existing zone must have already an atom-name + Name = + try + binary_to_existing_atom(Name0) + catch + _:_ -> + throw({unknown_zone, Name0}) + end, + assert_zone_exists(Name); +assert_zone_exists(default) -> + %% there is always a 'default' zone + ok; +assert_zone_exists(Name) when is_atom(Name) -> + try + _ = emqx_config:get([zones, Name]), + ok + catch + error:{config_not_found, _} -> + throw({unknown_zone, Name}) + end. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2b11306eb..dd9024fef 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -124,7 +124,7 @@ format_raw_listeners({Type0, Conf}) -> Bind = parse_bind(LConf0), MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), - LConf1 = maps:without([<<"authentication">>, <<"zone">>], LConf0), + LConf1 = maps:without([<<"authentication">>], LConf0), LConf2 = maps:put(<<"running">>, Running, LConf1), CurrConn = case Running of @@ -526,6 +526,7 @@ pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) -> pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) -> RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request), RawConf2 = ensure_override_limiter_conf(RawConf1, Request), + ok = assert_zone_exists(RawConf2), {ok, convert_certs(Type, Name, RawConf2)}; pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) -> {ok, emqx_utils_maps:deep_merge(RawConf, Updated)}; @@ -884,6 +885,11 @@ convert_certs(Type, Name, Conf) -> filter_stacktrace({Reason, _Stacktrace}) -> Reason; filter_stacktrace(Reason) -> Reason. +assert_zone_exists(#{<<"zone">> := Zone}) -> + emqx_config_zones:assert_zone_exists(Zone); +assert_zone_exists(_) -> + ok. + %% limiter config should override, not merge ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) -> Conf#{<<"limiter">> => Limiter}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 75c232f23..07885d93d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -36,9 +36,12 @@ groups() -> MaxConnTests = [ t_max_connection_default ], + ZoneTests = [ + t_update_listener_zone + ], [ {with_defaults_in_file, AllTests -- MaxConnTests}, - {without_defaults_in_file, AllTests -- MaxConnTests}, + {without_defaults_in_file, AllTests -- (MaxConnTests ++ ZoneTests)}, {max_connections, MaxConnTests} ]. @@ -403,6 +406,21 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(delete, NewPath, [], [])), ok. +t_update_listener_zone({init, Config}) -> + %% fake a zone + Config; +t_update_listener_zone({'end', _Config}) -> + ok; +t_update_listener_zone(_Config) -> + ListenerId = <<"tcp:default">>, + Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), + Conf = request(get, Path, [], []), + %% update + AddConf1 = Conf#{<<"zone">> => <<"unknownzone">>}, + AddConf2 = Conf#{<<"zone">> => <<"zone1">>}, + ?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)), + ?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)). + t_delete_nonexistent_listener(Config) when is_list(Config) -> NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]), ?assertMatch( @@ -518,5 +536,9 @@ cert_file(Name) -> default_listeners_hocon_text() -> Sc = #{roots => emqx_schema:listeners()}, Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}), - Config = #{<<"listeners">> => Listeners}, + Zones = #{<<"zone1">> => #{<<"mqtt">> => #{<<"max_inflight">> => 2}}}, + Config = #{ + <<"listeners">> => Listeners, + <<"zones">> => Zones + }, hocon_pp:do(Config, #{}). From c75cee3c04e460c23b4fc2863b027b2861962713 Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 8 May 2024 17:04:53 +0200 Subject: [PATCH 04/10] docs: add changelog for PR #12993 --- changes/ce/fix-12993.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ce/fix-12993.en.md diff --git a/changes/ce/fix-12993.en.md b/changes/ce/fix-12993.en.md new file mode 100644 index 000000000..3e565841c --- /dev/null +++ b/changes/ce/fix-12993.en.md @@ -0,0 +1,5 @@ +Fix listener config update API when handling an unknown zone. + +Prior to this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`, +the change would be accepted, causing all clients to crash when connect. +After this fix, updating listener with an unknown zone name will get a "Bad request" response. From 3e956e87113f5e61216cf46ee6d72f28f778b1b9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 8 May 2024 17:40:08 +0200 Subject: [PATCH 05/10] fix(sessds): use milder log level for regular error conditions Especially when such events are emitted in (potentially) tight loops, i.e. pulling messages from iterators pointing to remote shards. --- apps/emqx/src/emqx_persistent_session_ds.erl | 4 ++-- apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 28c370ba9..3989516e5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -622,7 +622,7 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) - replay_streams(Session#{replay := Rest}, ClientInfo); {error, recoverable, Reason} -> RetryTimeout = ?TIMEOUT_RETRY_REPLAY, - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "failed_to_fetch_replay_batch", stream => StreamKey, reason => Reason, @@ -925,7 +925,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> Session#{s => S}; {error, Class, Reason} -> %% TODO: Handle unrecoverable error. - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => "failed_to_fetch_batch", stream => StreamKey, reason => Reason, diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 1be0bdf4a..c6a968a9a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -220,7 +220,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "failed_to_initialize_stream_iterator", stream => Stream, class => recoverable, From 57287f07228cf015442950eb2ea91353522bb69a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 8 May 2024 19:56:17 +0300 Subject: [PATCH 06/10] fix(retainer): fix qlc cursor cleanup --- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 4 +- .../src/emqx_retainer_dispatcher.erl | 163 +++++++++++------- .../src/emqx_retainer_mnesia.erl | 11 +- .../test/emqx_retainer_SUITE.erl | 142 +++++++++++---- changes/ce/fix-12996.en.md | 1 + 6 files changed, 226 insertions(+), 97 deletions(-) create mode 100644 changes/ce/fix-12996.en.md diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 4a8b3cdc3..7bcde8d50 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.22"}, + {vsn, "5.0.23"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index b375f30ad..743046c80 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -47,6 +47,7 @@ retained_count/0, backend_module/0, backend_module/1, + backend_state/1, enabled/0 ]). @@ -103,6 +104,7 @@ -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> {ok, has_next(), list(message())}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. +-callback delete_cursor(backend_state(), cursor()) -> ok. -callback clear_expired(backend_state()) -> ok. -callback clean(backend_state()) -> ok. -callback size(backend_state()) -> non_neg_integer(). @@ -339,7 +341,7 @@ count(Context) -> clear_expired(Context) -> Mod = backend_module(Context), BackendState = backend_state(Context), - Mod:clear_expired(BackendState). + ok = Mod:clear_expired(BackendState). -spec store_retained(context(), message()) -> ok. store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index e918d8d52..19ae7bbe9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -46,15 +46,26 @@ -type limiter() :: emqx_htb_limiter:limiter(). -type context() :: emqx_retainer:context(). -type topic() :: emqx_types:topic(). --type cursor() :: emqx_retainer:cursor(). -define(POOL, ?MODULE). +%% For tests +-export([ + dispatch/3 +]). + +%% This module is `emqx_retainer` companion +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + %%%=================================================================== %%% API %%%=================================================================== + dispatch(Context, Topic) -> - cast({?FUNCTION_NAME, Context, self(), Topic}). + dispatch(Context, Topic, self()). + +dispatch(Context, Topic, Pid) -> + cast({dispatch, Context, Pid, Topic}). %% reset the client's limiter after updated the limiter's config refresh_limiter() -> @@ -156,7 +167,7 @@ handle_call(Req, _From, State) -> | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), NewState :: term()}. handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> - {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), + {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter), {noreply, State#{limiter := Limiter2}}; handle_cast({refresh_limiter, Conf}, State) -> BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), @@ -234,86 +245,120 @@ format_status(_Opt, Status) -> cast(Msg) -> gen_server:cast(worker(), Msg). --spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -dispatch(Context, Pid, Topic, Cursor, Limiter) -> +-spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}. +dispatch(Context, Pid, Topic, Limiter) -> Mod = emqx_retainer:backend_module(Context), - case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of - false -> - {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), - deliver(Result, Context, Pid, Topic, undefined, Limiter); + State = emqx_retainer:backend_state(Context), + case emqx_topic:wildcard(Topic) of true -> - {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), - deliver(Result, Context, Pid, Topic, NewCursor, Limiter) + {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined), + dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter); + false -> + {ok, Messages} = Mod:read_message(State, Topic), + dispatch_at_once(Messages, Pid, Topic, Limiter) end. --spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> - {ok, limiter()}. -deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> +dispatch_at_once(Messages, Pid, Topic, Limiter0) -> + case deliver(Messages, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Limiter1}; + {drop, Limiter1} -> + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + {ok, Limiter0} + end. + +dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) -> + ok = delete_cursor(Context, Cursor), {ok, Limiter}; -deliver([], Context, Pid, Topic, Cursor, Limiter) -> - dispatch(Context, Pid, Topic, Cursor, Limiter); -deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> +dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) -> + case deliver(Messages0, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0), + dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1); + {drop, Limiter1} -> + ok = delete_cursor(Context, Cursor0), + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + ok = delete_cursor(Context, Cursor0), + {ok, Limiter0} + end. + +match_next(_Context, _Topic, undefined) -> + {ok, [], undefined}; +match_next(Context, Topic, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:match_messages(State, Topic, Cursor). + +delete_cursor(_Context, undefined) -> + ok; +delete_cursor(Context, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:delete_cursor(State, Cursor). + +-spec deliver([emqx_types:message()], pid(), topic(), limiter()) -> + {ok, limiter()} | {drop, limiter()} | no_receiver. +deliver(Messages, Pid, Topic, Limiter) -> case erlang:is_process_alive(Pid) of false -> - {ok, Limiter}; + no_receiver; _ -> - DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), - case DeliverNum of + BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), + case BatchSize of 0 -> - do_deliver(Result, Pid, Topic), + deliver_to_client(Messages, Pid, Topic), {ok, Limiter}; _ -> - case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of - {ok, Limiter2} -> - deliver([], Context, Pid, Topic, Cursor, Limiter2); - {drop, Limiter2} -> - {ok, Limiter2} - end + deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter) end end. -do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> +deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) -> {ok, Limiter}; -do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> - {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), - case emqx_htb_limiter:consume(Num, Limiter) of - {ok, Limiter2} -> - do_deliver(ToDelivers, Pid, Topic), - do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); - {drop, _} = Drop -> +deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) -> + {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs), + case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of + {ok, Limiter1} -> + ok = deliver_to_client(Batch, Pid, Topic), + deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1); + {drop, _Limiter1} = Drop -> ?SLOG(debug, #{ msg => "retained_message_dropped", reason => "reached_ratelimit", - dropped_count => length(ToDelivers) + dropped_count => BatchActualSize }), Drop end. -do_deliver([Msg | T], Pid, Topic) -> - case emqx_banned:look_up({clientid, Msg#message.from}) of - [] -> - Pid ! {deliver, Topic, Msg}, - ok; - _ -> - ?tp( - notice, - ignore_retained_message_deliver, - #{ - reason => "client is banned", - clientid => Msg#message.from - } - ) - end, - do_deliver(T, Pid, Topic); -do_deliver([], _, _) -> +deliver_to_client([Msg | T], Pid, Topic) -> + _ = + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + Pid ! {deliver, Topic, Msg}; + _ -> + ?tp( + notice, + ignore_retained_message_deliver, + #{ + reason => "client is banned", + clientid => Msg#message.from + } + ) + end, + deliver_to_client(T, Pid, Topic); +deliver_to_client([], _, _) -> ok. -safe_split(N, List) -> - safe_split(N, List, 0, []). +take(N, List) -> + take(N, List, 0, []). -safe_split(0, List, Count, Acc) -> +take(0, List, Count, Acc) -> {Count, lists:reverse(Acc), List}; -safe_split(_N, [], Count, Acc) -> +take(_N, [], Count, Acc) -> {Count, lists:reverse(Acc), []}; -safe_split(N, [H | T], Count, Acc) -> - safe_split(N - 1, T, Count + 1, [H | Acc]). +take(N, [H | T], Count, Acc) -> + take(N - 1, T, Count + 1, [H | Acc]). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 7e2a73a09..daaa776b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -35,6 +35,7 @@ read_message/2, page_read/4, match_messages/3, + delete_cursor/2, clear_expired/1, clean/1, size/1 @@ -205,7 +206,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_State, Topic, undefined) -> +match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), QH = msg_table(search_table(Tokens, Now)), @@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) -> {ok, qlc:eval(QH), undefined}; BatchNum when is_integer(BatchNum) -> Cursor = qlc:cursor(QH), - match_messages(undefined, Topic, {Cursor, BatchNum}) + match_messages(State, Topic, {Cursor, BatchNum}) end; match_messages(_State, _Topic, {Cursor, BatchNum}) -> case qlc_next_answers(Cursor, BatchNum) of @@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) -> {ok, Rows, {Cursor, BatchNum}} end. +delete_cursor(_State, {Cursor, _}) -> + qlc:delete_cursor(Cursor); +delete_cursor(_State, undefined) -> + ok. + page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), QH = @@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when %% Fill index records in batches. QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), + ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), %% Enable read indices and unlock reindexing. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index d4ad43907..b29974068 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -21,6 +21,7 @@ -include("emqx_retainer.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -96,14 +97,19 @@ end_per_group(_Group, Config) -> emqx_retainer_mnesia:populate_index_meta(), Config. -init_per_testcase(t_get_basic_usage_info, Config) -> +init_per_testcase(_TestCase, Config) -> mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_MESSAGE), emqx_retainer_mnesia:populate_index_meta(), - Config; -init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(t_flow_control, _Config) -> + restore_delivery(); +end_per_testcase(t_cursor_cleanup, _Config) -> + restore_delivery(); +end_per_testcase(_TestCase, _Config) -> + ok. + app_spec() -> {emqx_retainer, ?BASE_CONF}. @@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), - LimiterCfg = make_limiter_cfg(Rate), - JsonCfg = make_limiter_json(<<"1/1s">>), - emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), - emqx_retainer:update_config(#{ - <<"delivery_rate">> => <<"1/1s">>, - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => JsonCfg - } - }), + setup_slow_delivery(), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -442,23 +436,60 @@ t_flow_control(_) -> {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), End = erlang:system_time(millisecond), + Diff = End - Begin, ?assert( - Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), ok = emqtt:disconnect(C1), + ok. + +t_cursor_cleanup(_) -> + setup_slow_delivery(), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + lists:foreach( + fun(I) -> + emqtt:publish( + C1, + <<"retained/", (integer_to_binary(I))/binary>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 5) + ), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + + snabbkaffe:start_trace(), + + ?assertWaitEvent( + emqtt:disconnect(C1), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + {Pid, Ref} = spawn_monitor(fun() -> ok end), + receive + {'DOWN', Ref, _, _, _} -> ok + after 1000 -> ct:fail("should receive 'DOWN' message") + end, + + ?assertWaitEvent( + emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + snabbkaffe:stop(), - emqx_limiter_server:del_bucket(emqx_retainer, internal), - emqx_retainer:update_config(#{ - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1 - } - }), ok. t_clear_expired(_) -> @@ -849,15 +880,21 @@ with_conf(ConfMod, Case) -> end. make_limiter_cfg(Rate) -> - Client = #{ - rate => Rate, - initial => 0, - burst => 0, - low_watermark => 1, - divisible => false, - max_retry_time => timer:seconds(5), - failure_strategy => force - }, + make_limiter_cfg(Rate, #{}). + +make_limiter_cfg(Rate, ClientOpts) -> + Client = maps:merge( + #{ + rate => Rate, + initial => 0, + burst => 0, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + ClientOpts + ), #{client => Client, rate => Rate, initial => 0, burst => 0}. make_limiter_json(Rate) -> @@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) -> Res = emqtt:publish(Client, Topic, Payload, Opts), ct:sleep(Time), Res. + +setup_slow_delivery() -> + Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), + LimiterCfg = make_limiter_cfg(Rate), + JsonCfg = make_limiter_json(<<"1/1s">>), + emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), + emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"batch_deliver_limiter">> => JsonCfg + } + }). + +restore_delivery() -> + emqx_limiter_server:del_bucket(emqx_retainer, internal), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1 + } + }). + +qlc_processes() -> + lists:filter( + fun(Pid) -> + {current_function, {qlc, wait_for_request, 3}} =:= + erlang:process_info(Pid, current_function) + end, + erlang:processes() + ). + +qlc_process_count() -> + length(qlc_processes()). diff --git a/changes/ce/fix-12996.en.md b/changes/ce/fix-12996.en.md new file mode 100644 index 000000000..0c3cc872f --- /dev/null +++ b/changes/ce/fix-12996.en.md @@ -0,0 +1 @@ +Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak. From e3a59c4037e5c961cf30fef04fc097b370c96b5e Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 9 May 2024 16:27:58 +0800 Subject: [PATCH 07/10] fix(sysk): fix that the syskeeper forwarding never reconnecting --- .../src/emqx_bridge_syskeeper_connector.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index a6d47229c..898915f56 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -213,9 +213,9 @@ on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) -> ), status_result(Health). -status_result(true) -> connected; -status_result(false) -> connecting; -status_result({error, _}) -> connecting. +status_result(true) -> ?status_connected; +status_result(false) -> ?status_disconnected; +status_result({error, _}) -> ?status_disconnected. on_add_channel( _InstanceId, @@ -251,7 +251,7 @@ on_get_channels(InstanceId) -> on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> case maps:is_key(ChannelId, Channels) of true -> - connected; + ?status_connected; _ -> {error, not_exists} end. From 7b80a9aa443849b0161fecb2f0621f606e7ef2b8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 9 May 2024 15:23:58 -0300 Subject: [PATCH 08/10] fix(aggregator): namespace aggregator ids with action type Otherwise, actions of different types but same names will clash when starting the aggregator supervision tree. --- .../src/emqx_bridge_s3_connector.erl | 37 +++++++++++-------- .../emqx_bridge_s3_aggreg_upload_SUITE.erl | 29 ++++++++++----- .../src/emqx_connector_aggreg_delivery.erl | 28 +++++++------- 3 files changed, 55 insertions(+), 39 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index f9d3af478..0a155eefb 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -209,6 +209,7 @@ start_channel(State, #{ bucket := Bucket } }) -> + AggregId = {Type, Name}, AggregOpts = #{ time_interval => TimeInterval, max_records => MaxRecords, @@ -223,19 +224,21 @@ start_channel(State, #{ client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, - _ = emqx_connector_aggreg_sup:delete_child({Type, Name}), + _ = emqx_connector_aggreg_sup:delete_child(AggregId), {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ - id => {Type, Name}, - start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + id => AggregId, + start => + {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]}, type => supervisor, restart => permanent }), #{ type => ?ACTION_AGGREGATED_UPLOAD, name => Name, + aggreg_id => AggregId, bucket => Bucket, supervisor => SupPid, - on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end + on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end }. upload_options(Parameters) -> @@ -254,12 +257,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) -> %% Since bucket name may be templated, we can't really provide any additional %% information regarding the channel health. ?status_connected; -channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> +channel_status( + #{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State +) -> %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), - ok = emqx_connector_aggregator:tick(Name, Timestamp), + ok = emqx_connector_aggregator:tick(AggregId, Timestamp), ok = check_bucket_accessible(Bucket, State), - ok = check_aggreg_upload_errors(Name), + ok = check_aggreg_upload_errors(AggregId), ?status_connected. check_bucket_accessible(Bucket, #{client_config := Config}) -> @@ -278,8 +283,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> end end. -check_aggreg_upload_errors(Name) -> - case emqx_connector_aggregator:take_error(Name) of +check_aggreg_upload_errors(AggregId) -> + case emqx_connector_aggregator:take_error(AggregId) of [Error] -> %% TODO %% This approach means that, for example, 3 upload failures will cause @@ -353,11 +358,11 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{name := Name}) -> +run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), - case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of + case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> - ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), + ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> {error, {unrecoverable_error, Reason}} @@ -406,8 +411,8 @@ init_transfer_state(BufferMap, Opts) -> Key = mk_object_key(BufferMap, Opts), emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). -mk_object_key(BufferMap, #{action := Name, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). +mk_object_key(BufferMap, #{action := AggregId, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}). process_append(Writes, Upload0) -> {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), @@ -443,9 +448,9 @@ process_terminate(Upload) -> -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> {ok, integer() | string()} | {error, undefined}. -lookup([<<"action">>], {Name, _Buffer}) -> +lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) -> {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_Name, Buffer = #{}}) -> +lookup(Accessor, {_AggregId, Buffer = #{}}) -> lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 0ae34486f..af121ed8d 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -158,6 +158,7 @@ t_on_get_status(Config) -> t_aggreg_upload(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BridgeNameString = unicode:characters_to_list(BridgeName), NodeString = atom_to_list(node()), %% Create a bridge with the sample configuration. @@ -170,7 +171,7 @@ t_aggreg_upload(Config) -> ]), ok = send_messages(BridgeName, MessageEvents), %% Wait until the delivery is completed. - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), ?assertMatch( @@ -196,6 +197,7 @@ t_aggreg_upload(Config) -> t_aggreg_upload_rule(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), %% Create a bridge with the sample configuration and a simple SQL rule. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) -> emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) ]), - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) -> %% after a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send some sample messages that look like Rule SQL productions. @@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) -> {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} ]), ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Send some more messages. ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check there's still only one upload. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), @@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) -> %% and does so while preserving uncompromised data. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BatchSize = ?CONF_MAX_RECORDS div 2, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) -> %% Ensure that they span multiple batch queries. ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), {ok, _} = ?block_until( - #{?snk_kind := connector_aggreg_records_written, action := BridgeName}, + #{?snk_kind := connector_aggreg_records_written, action := AggregId}, infinity, 0 ), %% Find out the buffer file. {ok, #{filename := Filename}} = ?block_until( - #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} + #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId} ), %% Stop the bridge, corrupt the buffer file, and restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), @@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) -> ], ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that upload contains part of the first batch and all of the second batch. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) -> %% a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send few large messages that will require multipart upload. @@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) -> %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that delivery contains all the messages. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) -> %% and windowing work correctly under high rate, high concurrency conditions. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), NSenders = 4, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) -> %% Wait for the last delivery to complete. ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ?block_until( - #{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 + #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0 ), %% There should be at least 2 time windows of aggregated records. Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], @@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) -> #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), {ok, CSV} = erl_csv:decode(Content), CSV. + +aggreg_id(BridgeName) -> + {?BRIDGE_TYPE, BridgeName}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index 071c28ee5..c2b4549c1 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -25,7 +25,7 @@ ]). -record(delivery, { - name :: _Name, + id :: id(), callback_module :: module(), container :: emqx_connector_aggreg_csv:container(), reader :: emqx_connector_aggreg_buffer:reader(), @@ -33,6 +33,8 @@ empty :: boolean() }). +-type id() :: term(). + -type state() :: #delivery{}. -type init_opts() :: #{ @@ -59,22 +61,22 @@ %% -start_link(Name, Buffer, Opts) -> - proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). +start_link(Id, Buffer, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]). %% --spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). -init(Parent, Name, Buffer, Opts) -> - ?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), +-spec init(pid(), id(), buffer(), init_opts()) -> no_return(). +init(Parent, Id, Buffer, Opts) -> + ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}), Reader = open_buffer(Buffer), - Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}), _ = erlang:process_flag(trap_exit, true), ok = proc_lib:init_ack({ok, self()}), loop(Delivery, Parent, []). init_delivery( - Name, + Id, Reader, Buffer, Opts = #{ @@ -84,7 +86,7 @@ init_delivery( ) -> BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), #delivery{ - name = Name, + id = Id, callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, @@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) error({transfer_failed, Reason}) end. -process_complete(#delivery{name = Name, empty = true}) -> - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), +process_complete(#delivery{id = Id, empty = true}) -> + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}), exit({shutdown, {skipped, empty}}); process_complete(#delivery{ - name = Name, callback_module = Mod, container = Container, transfer = Transfer0 + id = Id, callback_module = Mod, container = Container, transfer = Transfer0 }) -> Trailer = emqx_connector_aggreg_csv:close(Container), Transfer = Mod:process_append(Trailer, Transfer0), {ok, Completed} = Mod:process_complete(Transfer), - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}), ok. %% From 1f3b640a3d65d2216653c85428870631009b5a32 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 9 May 2024 16:53:31 +0800 Subject: [PATCH 09/10] chore: update change --- apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl | 2 +- changes/ee/fix-13001.en.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changes/ee/fix-13001.en.md diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl index 273afffab..9eb882a43 100644 --- a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -347,7 +347,7 @@ t_get_status(Config) -> _Sleep = 500, _Attempts = 10, ?assertMatch( - #{status := connecting}, + #{status := disconnected}, emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) ) ). diff --git a/changes/ee/fix-13001.en.md b/changes/ee/fix-13001.en.md new file mode 100644 index 000000000..5d431e0f5 --- /dev/null +++ b/changes/ee/fix-13001.en.md @@ -0,0 +1 @@ +Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost. From 3a29696a48ee035a544412bfd3ba496ecf33f2f0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 9 May 2024 16:55:54 -0300 Subject: [PATCH 10/10] fix(aggregator): refactor supervision tree Instead of using a aggregator supervisor with a fixed local name, we should allow specifying different names so each action app will spawn its own aggregator supervisor. --- .../emqx_bridge_s3/src/emqx_bridge_s3.app.src | 1 + .../src/emqx_bridge_s3_app.erl} | 4 +- .../src/emqx_bridge_s3_connector.erl | 8 +-- .../emqx_bridge_s3/src/emqx_bridge_s3_sup.erl | 54 +++++++++++++++++++ .../src/emqx_connector_aggreg_sup.erl | 42 --------------- .../src/emqx_connector_aggregator.app.src | 1 - 6 files changed, 62 insertions(+), 48 deletions(-) rename apps/{emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl => emqx_bridge_s3/src/emqx_bridge_s3_app.erl} (91%) create mode 100644 apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl delete mode 100644 apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 46f8db64b..05c8592d8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -19,6 +19,7 @@ emqx_bridge_s3_connector_info ]} ]}, + {mod, {emqx_bridge_s3_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl similarity index 91% rename from apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl rename to apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl index ce6e11ab2..42cfaa83e 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_connector_aggreg_app). +-module(emqx_bridge_s3_app). -behaviour(application). -export([start/2, stop/1]). @@ -15,7 +15,7 @@ %%------------------------------------------------------------------------------ start(_StartType, _StartArgs) -> - emqx_connector_aggreg_sup:start_link(). + emqx_bridge_s3_sup:start_link(). stop(_State) -> ok. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 0a155eefb..ce1bee8d1 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -87,6 +87,8 @@ channels := #{channel_id() => channel_state()} }. +-define(AGGREG_SUP, emqx_bridge_s3_sup). + %% -spec callback_mode() -> callback_mode(). @@ -224,8 +226,8 @@ start_channel(State, #{ client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, - _ = emqx_connector_aggreg_sup:delete_child(AggregId), - {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ + _ = ?AGGREG_SUP:delete_child(AggregId), + {ok, SupPid} = ?AGGREG_SUP:start_child(#{ id => AggregId, start => {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]}, @@ -238,7 +240,7 @@ start_channel(State, #{ aggreg_id => AggregId, bucket => Bucket, supervisor => SupPid, - on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end + on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. upload_options(Parameters) -> diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl new file mode 100644 index 000000000..875f09fda --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_sup). + +%% API +-export([ + start_link/0, + start_child/1, + delete_child/1 +]). + +%% `supervisor' API +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +delete_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> + supervisor:delete_child(?MODULE, ChildId); + Error -> + Error + end. + +%%------------------------------------------------------------------------------ +%% `supervisor' API +%%------------------------------------------------------------------------------ + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 1, + period => 1 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl deleted file mode 100644 index e80652542..000000000 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl +++ /dev/null @@ -1,42 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_connector_aggreg_sup). - --export([ - start_link/0, - start_child/1, - delete_child/1 -]). - --behaviour(supervisor). --export([init/1]). - --define(SUPREF, ?MODULE). - -%% - -start_link() -> - supervisor:start_link({local, ?SUPREF}, ?MODULE, root). - -start_child(ChildSpec) -> - supervisor:start_child(?SUPREF, ChildSpec). - -delete_child(ChildId) -> - case supervisor:terminate_child(?SUPREF, ChildId) of - ok -> - supervisor:delete_child(?SUPREF, ChildId); - Error -> - Error - end. - -%% - -init(root) -> - SupFlags = #{ - strategy => one_for_one, - intensity => 1, - period => 1 - }, - {ok, {SupFlags, []}}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src index 870219807..6562958ee 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src @@ -7,7 +7,6 @@ stdlib ]}, {env, []}, - {mod, {emqx_connector_aggreg_app, []}}, {modules, []}, {links, []} ]}.