diff --git a/apps/emqx/src/config/emqx_config_zones.erl b/apps/emqx/src/config/emqx_config_zones.erl index f4f3c7420..9b2fb224a 100644 --- a/apps/emqx/src/config/emqx_config_zones.erl +++ b/apps/emqx/src/config/emqx_config_zones.erl @@ -20,6 +20,7 @@ %% API -export([add_handler/0, remove_handler/0, pre_config_update/3]). -export([is_olp_enabled/0]). +-export([assert_zone_exists/1]). -define(ZONES, [zones]). @@ -44,3 +45,26 @@ is_olp_enabled() -> false, emqx_config:get([zones], #{}) ). + +-spec assert_zone_exists(binary() | atom()) -> ok. +assert_zone_exists(Name0) when is_binary(Name0) -> + %% an existing zone must have already an atom-name + Name = + try + binary_to_existing_atom(Name0) + catch + _:_ -> + throw({unknown_zone, Name0}) + end, + assert_zone_exists(Name); +assert_zone_exists(default) -> + %% there is always a 'default' zone + ok; +assert_zone_exists(Name) when is_atom(Name) -> + try + _ = emqx_config:get([zones, Name]), + ok + catch + error:{config_not_found, _} -> + throw({unknown_zone, Name}) + end. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5ea39d5e8..122118c6d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -124,7 +124,7 @@ format_raw_listeners({Type0, Conf}) -> Bind = parse_bind(LConf0), MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), - LConf1 = maps:without([<<"authentication">>, <<"zone">>], LConf0), + LConf1 = maps:without([<<"authentication">>], LConf0), LConf2 = maps:put(<<"running">>, Running, LConf1), CurrConn = case Running of @@ -526,6 +526,7 @@ pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) -> pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) -> RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request), RawConf2 = ensure_override_limiter_conf(RawConf1, Request), + ok = assert_zone_exists(RawConf2), {ok, convert_certs(Type, Name, RawConf2)}; pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) -> {ok, emqx_utils_maps:deep_merge(RawConf, Updated)}; @@ -896,6 +897,11 @@ convert_certs(Type, Name, Conf) -> filter_stacktrace({Reason, _Stacktrace}) -> Reason; filter_stacktrace(Reason) -> Reason. +assert_zone_exists(#{<<"zone">> := Zone}) -> + emqx_config_zones:assert_zone_exists(Zone); +assert_zone_exists(_) -> + ok. + %% limiter config should override, not merge ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) -> Conf#{<<"limiter">> => Limiter}; diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 28c370ba9..3989516e5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -622,7 +622,7 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) - replay_streams(Session#{replay := Rest}, ClientInfo); {error, recoverable, Reason} -> RetryTimeout = ?TIMEOUT_RETRY_REPLAY, - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "failed_to_fetch_replay_batch", stream => StreamKey, reason => Reason, @@ -925,7 +925,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> Session#{s => S}; {error, Class, Reason} -> %% TODO: Handle unrecoverable error. - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => "failed_to_fetch_batch", stream => StreamKey, reason => Reason, diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 1be0bdf4a..c6a968a9a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -220,7 +220,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} -> - ?SLOG(warning, #{ + ?SLOG(debug, #{ msg => "failed_to_initialize_stream_iterator", stream => Stream, class => recoverable, diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src index 46f8db64b..05c8592d8 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src @@ -19,6 +19,7 @@ emqx_bridge_s3_connector_info ]} ]}, + {mod, {emqx_bridge_s3_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl similarity index 91% rename from apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl rename to apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl index ce6e11ab2..42cfaa83e 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_app.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_app.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_connector_aggreg_app). +-module(emqx_bridge_s3_app). -behaviour(application). -export([start/2, stop/1]). @@ -15,7 +15,7 @@ %%------------------------------------------------------------------------------ start(_StartType, _StartArgs) -> - emqx_connector_aggreg_sup:start_link(). + emqx_bridge_s3_sup:start_link(). stop(_State) -> ok. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index f9d3af478..ce1bee8d1 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -87,6 +87,8 @@ channels := #{channel_id() => channel_state()} }. +-define(AGGREG_SUP, emqx_bridge_s3_sup). + %% -spec callback_mode() -> callback_mode(). @@ -209,6 +211,7 @@ start_channel(State, #{ bucket := Bucket } }) -> + AggregId = {Type, Name}, AggregOpts = #{ time_interval => TimeInterval, max_records => MaxRecords, @@ -223,19 +226,21 @@ start_channel(State, #{ client_config => maps:get(client_config, State), uploader_config => maps:with([min_part_size, max_part_size], Parameters) }, - _ = emqx_connector_aggreg_sup:delete_child({Type, Name}), - {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ - id => {Type, Name}, - start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, + _ = ?AGGREG_SUP:delete_child(AggregId), + {ok, SupPid} = ?AGGREG_SUP:start_child(#{ + id => AggregId, + start => + {emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]}, type => supervisor, restart => permanent }), #{ type => ?ACTION_AGGREGATED_UPLOAD, name => Name, + aggreg_id => AggregId, bucket => Bucket, supervisor => SupPid, - on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end + on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end }. upload_options(Parameters) -> @@ -254,12 +259,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) -> %% Since bucket name may be templated, we can't really provide any additional %% information regarding the channel health. ?status_connected; -channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> +channel_status( + #{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State +) -> %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), - ok = emqx_connector_aggregator:tick(Name, Timestamp), + ok = emqx_connector_aggregator:tick(AggregId, Timestamp), ok = check_bucket_accessible(Bucket, State), - ok = check_aggreg_upload_errors(Name), + ok = check_aggreg_upload_errors(AggregId), ?status_connected. check_bucket_accessible(Bucket, #{client_config := Config}) -> @@ -278,8 +285,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) -> end end. -check_aggreg_upload_errors(Name) -> - case emqx_connector_aggregator:take_error(Name) of +check_aggreg_upload_errors(AggregId) -> + case emqx_connector_aggregator:take_error(AggregId) of [Error] -> %% TODO %% This approach means that, for example, 3 upload failures will cause @@ -353,11 +360,11 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{name := Name}) -> +run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), - case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of + case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> - ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), + ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), ok; {error, Reason} -> {error, {unrecoverable_error, Reason}} @@ -406,8 +413,8 @@ init_transfer_state(BufferMap, Opts) -> Key = mk_object_key(BufferMap, Opts), emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). -mk_object_key(BufferMap, #{action := Name, key := Template}) -> - emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). +mk_object_key(BufferMap, #{action := AggregId, key := Template}) -> + emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}). process_append(Writes, Upload0) -> {ok, Upload} = emqx_s3_upload:append(Writes, Upload0), @@ -443,9 +450,9 @@ process_terminate(Upload) -> -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> {ok, integer() | string()} | {error, undefined}. -lookup([<<"action">>], {Name, _Buffer}) -> +lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) -> {ok, mk_fs_safe_string(Name)}; -lookup(Accessor, {_Name, Buffer = #{}}) -> +lookup(Accessor, {_AggregId, Buffer = #{}}) -> lookup_buffer_var(Accessor, Buffer); lookup(_Accessor, _Context) -> {error, undefined}. diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl new file mode 100644 index 000000000..875f09fda --- /dev/null +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_sup.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_s3_sup). + +%% API +-export([ + start_link/0, + start_child/1, + delete_child/1 +]). + +%% `supervisor' API +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(ChildSpec) -> + supervisor:start_child(?MODULE, ChildSpec). + +delete_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> + supervisor:delete_child(?MODULE, ChildId); + Error -> + Error + end. + +%%------------------------------------------------------------------------------ +%% `supervisor' API +%%------------------------------------------------------------------------------ + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 1, + period => 1 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl index 0ae34486f..af121ed8d 100644 --- a/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl +++ b/apps/emqx_bridge_s3/test/emqx_bridge_s3_aggreg_upload_SUITE.erl @@ -158,6 +158,7 @@ t_on_get_status(Config) -> t_aggreg_upload(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BridgeNameString = unicode:characters_to_list(BridgeName), NodeString = atom_to_list(node()), %% Create a bridge with the sample configuration. @@ -170,7 +171,7 @@ t_aggreg_upload(Config) -> ]), ok = send_messages(BridgeName, MessageEvents), %% Wait until the delivery is completed. - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), ?assertMatch( @@ -196,6 +197,7 @@ t_aggreg_upload(Config) -> t_aggreg_upload_rule(Config) -> Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), %% Create a bridge with the sample configuration and a simple SQL rule. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) -> emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) ]), - ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check the uploaded objects. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) -> %% after a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send some sample messages that look like Rule SQL productions. @@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) -> {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} ]), ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Send some more messages. ok = send_messages(BridgeName, MessageEvents), - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check there's still only one upload. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), @@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) -> %% and does so while preserving uncompromised data. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), BatchSize = ?CONF_MAX_RECORDS div 2, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) -> %% Ensure that they span multiple batch queries. ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), {ok, _} = ?block_until( - #{?snk_kind := connector_aggreg_records_written, action := BridgeName}, + #{?snk_kind := connector_aggreg_records_written, action := AggregId}, infinity, 0 ), %% Find out the buffer file. {ok, #{filename := Filename}} = ?block_until( - #{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} + #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId} ), %% Stop the bridge, corrupt the buffer file, and restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), @@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) -> ], ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that upload contains part of the first batch and all of the second batch. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) -> %% a restart. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), %% Send few large messages that will require multipart upload. @@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) -> %% Restart the bridge. {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), %% Wait until the delivery is completed. - {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), + {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}), %% Check that delivery contains all the messages. _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), [_Header | Rows] = fetch_parse_csv(Bucket, Key), @@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) -> %% and windowing work correctly under high rate, high concurrency conditions. Bucket = ?config(s3_bucket, Config), BridgeName = ?config(bridge_name, Config), + AggregId = aggreg_id(BridgeName), NSenders = 4, %% Create a bridge with the sample configuration. ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), @@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) -> %% Wait for the last delivery to complete. ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ?block_until( - #{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 + #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0 ), %% There should be at least 2 time windows of aggregated records. Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], @@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) -> #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), {ok, CSV} = erl_csv:decode(Content), CSV. + +aggreg_id(BridgeName) -> + {?BRIDGE_TYPE, BridgeName}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index a6d47229c..898915f56 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -213,9 +213,9 @@ on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) -> ), status_result(Health). -status_result(true) -> connected; -status_result(false) -> connecting; -status_result({error, _}) -> connecting. +status_result(true) -> ?status_connected; +status_result(false) -> ?status_disconnected; +status_result({error, _}) -> ?status_disconnected. on_add_channel( _InstanceId, @@ -251,7 +251,7 @@ on_get_channels(InstanceId) -> on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> case maps:is_key(ChannelId, Channels) of true -> - connected; + ?status_connected; _ -> {error, not_exists} end. diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl index 273afffab..9eb882a43 100644 --- a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -347,7 +347,7 @@ t_get_status(Config) -> _Sleep = 500, _Attempts = 10, ?assertMatch( - #{status := connecting}, + #{status := disconnected}, emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) ) ). diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl index 071c28ee5..c2b4549c1 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_delivery.erl @@ -25,7 +25,7 @@ ]). -record(delivery, { - name :: _Name, + id :: id(), callback_module :: module(), container :: emqx_connector_aggreg_csv:container(), reader :: emqx_connector_aggreg_buffer:reader(), @@ -33,6 +33,8 @@ empty :: boolean() }). +-type id() :: term(). + -type state() :: #delivery{}. -type init_opts() :: #{ @@ -59,22 +61,22 @@ %% -start_link(Name, Buffer, Opts) -> - proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). +start_link(Id, Buffer, Opts) -> + proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]). %% --spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). -init(Parent, Name, Buffer, Opts) -> - ?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), +-spec init(pid(), id(), buffer(), init_opts()) -> no_return(). +init(Parent, Id, Buffer, Opts) -> + ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}), Reader = open_buffer(Buffer), - Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), + Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}), _ = erlang:process_flag(trap_exit, true), ok = proc_lib:init_ack({ok, self()}), loop(Delivery, Parent, []). init_delivery( - Name, + Id, Reader, Buffer, Opts = #{ @@ -84,7 +86,7 @@ init_delivery( ) -> BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), #delivery{ - name = Name, + id = Id, callback_module = Mod, container = mk_container(ContainerOpts), reader = Reader, @@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0}) error({transfer_failed, Reason}) end. -process_complete(#delivery{name = Name, empty = true}) -> - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), +process_complete(#delivery{id = Id, empty = true}) -> + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}), exit({shutdown, {skipped, empty}}); process_complete(#delivery{ - name = Name, callback_module = Mod, container = Container, transfer = Transfer0 + id = Id, callback_module = Mod, container = Container, transfer = Transfer0 }) -> Trailer = emqx_connector_aggreg_csv:close(Container), Transfer = Mod:process_append(Trailer, Transfer0), {ok, Completed} = Mod:process_complete(Transfer), - ?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), + ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}), ok. %% diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl b/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl deleted file mode 100644 index e80652542..000000000 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggreg_sup.erl +++ /dev/null @@ -1,42 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_connector_aggreg_sup). - --export([ - start_link/0, - start_child/1, - delete_child/1 -]). - --behaviour(supervisor). --export([init/1]). - --define(SUPREF, ?MODULE). - -%% - -start_link() -> - supervisor:start_link({local, ?SUPREF}, ?MODULE, root). - -start_child(ChildSpec) -> - supervisor:start_child(?SUPREF, ChildSpec). - -delete_child(ChildId) -> - case supervisor:terminate_child(?SUPREF, ChildId) of - ok -> - supervisor:delete_child(?SUPREF, ChildId); - Error -> - Error - end. - -%% - -init(root) -> - SupFlags = #{ - strategy => one_for_one, - intensity => 1, - period => 1 - }, - {ok, {SupFlags, []}}. diff --git a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src index 870219807..6562958ee 100644 --- a/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src +++ b/apps/emqx_connector_aggregator/src/emqx_connector_aggregator.app.src @@ -7,7 +7,6 @@ stdlib ]}, {env, []}, - {mod, {emqx_connector_aggreg_app, []}}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 1c581514a..eebbaad08 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -810,6 +810,7 @@ listener_id_status_example() -> tcp_schema_example() -> #{ + type => tcp, acceptors => 16, access_rules => ["allow all"], bind => <<"0.0.0.0:1884">>, @@ -820,6 +821,7 @@ tcp_schema_example() -> proxy_protocol => false, proxy_protocol_timeout => <<"3s">>, running => true, + zone => default, tcp_options => #{ active_n => 100, backlog => 1024, @@ -829,8 +831,7 @@ tcp_schema_example() -> reuseaddr => true, send_timeout => <<"15s">>, send_timeout_close => true - }, - type => tcp + } }. create_listener(From, Body) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index 75c232f23..07885d93d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -36,9 +36,12 @@ groups() -> MaxConnTests = [ t_max_connection_default ], + ZoneTests = [ + t_update_listener_zone + ], [ {with_defaults_in_file, AllTests -- MaxConnTests}, - {without_defaults_in_file, AllTests -- MaxConnTests}, + {without_defaults_in_file, AllTests -- (MaxConnTests ++ ZoneTests)}, {max_connections, MaxConnTests} ]. @@ -403,6 +406,21 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(delete, NewPath, [], [])), ok. +t_update_listener_zone({init, Config}) -> + %% fake a zone + Config; +t_update_listener_zone({'end', _Config}) -> + ok; +t_update_listener_zone(_Config) -> + ListenerId = <<"tcp:default">>, + Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), + Conf = request(get, Path, [], []), + %% update + AddConf1 = Conf#{<<"zone">> => <<"unknownzone">>}, + AddConf2 = Conf#{<<"zone">> => <<"zone1">>}, + ?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)), + ?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)). + t_delete_nonexistent_listener(Config) when is_list(Config) -> NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]), ?assertMatch( @@ -518,5 +536,9 @@ cert_file(Name) -> default_listeners_hocon_text() -> Sc = #{roots => emqx_schema:listeners()}, Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}), - Config = #{<<"listeners">> => Listeners}, + Zones = #{<<"zone1">> => #{<<"mqtt">> => #{<<"max_inflight">> => 2}}}, + Config = #{ + <<"listeners">> => Listeners, + <<"zones">> => Zones + }, hocon_pp:do(Config, #{}). diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 4a8b3cdc3..7bcde8d50 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.22"}, + {vsn, "5.0.23"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index b375f30ad..743046c80 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -47,6 +47,7 @@ retained_count/0, backend_module/0, backend_module/1, + backend_state/1, enabled/0 ]). @@ -103,6 +104,7 @@ -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> {ok, has_next(), list(message())}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. +-callback delete_cursor(backend_state(), cursor()) -> ok. -callback clear_expired(backend_state()) -> ok. -callback clean(backend_state()) -> ok. -callback size(backend_state()) -> non_neg_integer(). @@ -339,7 +341,7 @@ count(Context) -> clear_expired(Context) -> Mod = backend_module(Context), BackendState = backend_state(Context), - Mod:clear_expired(BackendState). + ok = Mod:clear_expired(BackendState). -spec store_retained(context(), message()) -> ok. store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index e918d8d52..19ae7bbe9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -46,15 +46,26 @@ -type limiter() :: emqx_htb_limiter:limiter(). -type context() :: emqx_retainer:context(). -type topic() :: emqx_types:topic(). --type cursor() :: emqx_retainer:cursor(). -define(POOL, ?MODULE). +%% For tests +-export([ + dispatch/3 +]). + +%% This module is `emqx_retainer` companion +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + %%%=================================================================== %%% API %%%=================================================================== + dispatch(Context, Topic) -> - cast({?FUNCTION_NAME, Context, self(), Topic}). + dispatch(Context, Topic, self()). + +dispatch(Context, Topic, Pid) -> + cast({dispatch, Context, Pid, Topic}). %% reset the client's limiter after updated the limiter's config refresh_limiter() -> @@ -156,7 +167,7 @@ handle_call(Req, _From, State) -> | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), NewState :: term()}. handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> - {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), + {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter), {noreply, State#{limiter := Limiter2}}; handle_cast({refresh_limiter, Conf}, State) -> BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), @@ -234,86 +245,120 @@ format_status(_Opt, Status) -> cast(Msg) -> gen_server:cast(worker(), Msg). --spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -dispatch(Context, Pid, Topic, Cursor, Limiter) -> +-spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}. +dispatch(Context, Pid, Topic, Limiter) -> Mod = emqx_retainer:backend_module(Context), - case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of - false -> - {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), - deliver(Result, Context, Pid, Topic, undefined, Limiter); + State = emqx_retainer:backend_state(Context), + case emqx_topic:wildcard(Topic) of true -> - {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), - deliver(Result, Context, Pid, Topic, NewCursor, Limiter) + {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined), + dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter); + false -> + {ok, Messages} = Mod:read_message(State, Topic), + dispatch_at_once(Messages, Pid, Topic, Limiter) end. --spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> - {ok, limiter()}. -deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> +dispatch_at_once(Messages, Pid, Topic, Limiter0) -> + case deliver(Messages, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Limiter1}; + {drop, Limiter1} -> + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + {ok, Limiter0} + end. + +dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) -> + ok = delete_cursor(Context, Cursor), {ok, Limiter}; -deliver([], Context, Pid, Topic, Cursor, Limiter) -> - dispatch(Context, Pid, Topic, Cursor, Limiter); -deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> +dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) -> + case deliver(Messages0, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0), + dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1); + {drop, Limiter1} -> + ok = delete_cursor(Context, Cursor0), + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + ok = delete_cursor(Context, Cursor0), + {ok, Limiter0} + end. + +match_next(_Context, _Topic, undefined) -> + {ok, [], undefined}; +match_next(Context, Topic, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:match_messages(State, Topic, Cursor). + +delete_cursor(_Context, undefined) -> + ok; +delete_cursor(Context, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:delete_cursor(State, Cursor). + +-spec deliver([emqx_types:message()], pid(), topic(), limiter()) -> + {ok, limiter()} | {drop, limiter()} | no_receiver. +deliver(Messages, Pid, Topic, Limiter) -> case erlang:is_process_alive(Pid) of false -> - {ok, Limiter}; + no_receiver; _ -> - DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), - case DeliverNum of + BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), + case BatchSize of 0 -> - do_deliver(Result, Pid, Topic), + deliver_to_client(Messages, Pid, Topic), {ok, Limiter}; _ -> - case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of - {ok, Limiter2} -> - deliver([], Context, Pid, Topic, Cursor, Limiter2); - {drop, Limiter2} -> - {ok, Limiter2} - end + deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter) end end. -do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> +deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) -> {ok, Limiter}; -do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> - {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), - case emqx_htb_limiter:consume(Num, Limiter) of - {ok, Limiter2} -> - do_deliver(ToDelivers, Pid, Topic), - do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); - {drop, _} = Drop -> +deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) -> + {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs), + case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of + {ok, Limiter1} -> + ok = deliver_to_client(Batch, Pid, Topic), + deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1); + {drop, _Limiter1} = Drop -> ?SLOG(debug, #{ msg => "retained_message_dropped", reason => "reached_ratelimit", - dropped_count => length(ToDelivers) + dropped_count => BatchActualSize }), Drop end. -do_deliver([Msg | T], Pid, Topic) -> - case emqx_banned:look_up({clientid, Msg#message.from}) of - [] -> - Pid ! {deliver, Topic, Msg}, - ok; - _ -> - ?tp( - notice, - ignore_retained_message_deliver, - #{ - reason => "client is banned", - clientid => Msg#message.from - } - ) - end, - do_deliver(T, Pid, Topic); -do_deliver([], _, _) -> +deliver_to_client([Msg | T], Pid, Topic) -> + _ = + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + Pid ! {deliver, Topic, Msg}; + _ -> + ?tp( + notice, + ignore_retained_message_deliver, + #{ + reason => "client is banned", + clientid => Msg#message.from + } + ) + end, + deliver_to_client(T, Pid, Topic); +deliver_to_client([], _, _) -> ok. -safe_split(N, List) -> - safe_split(N, List, 0, []). +take(N, List) -> + take(N, List, 0, []). -safe_split(0, List, Count, Acc) -> +take(0, List, Count, Acc) -> {Count, lists:reverse(Acc), List}; -safe_split(_N, [], Count, Acc) -> +take(_N, [], Count, Acc) -> {Count, lists:reverse(Acc), []}; -safe_split(N, [H | T], Count, Acc) -> - safe_split(N - 1, T, Count + 1, [H | Acc]). +take(N, [H | T], Count, Acc) -> + take(N - 1, T, Count + 1, [H | Acc]). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 7e2a73a09..daaa776b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -35,6 +35,7 @@ read_message/2, page_read/4, match_messages/3, + delete_cursor/2, clear_expired/1, clean/1, size/1 @@ -205,7 +206,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_State, Topic, undefined) -> +match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), QH = msg_table(search_table(Tokens, Now)), @@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) -> {ok, qlc:eval(QH), undefined}; BatchNum when is_integer(BatchNum) -> Cursor = qlc:cursor(QH), - match_messages(undefined, Topic, {Cursor, BatchNum}) + match_messages(State, Topic, {Cursor, BatchNum}) end; match_messages(_State, _Topic, {Cursor, BatchNum}) -> case qlc_next_answers(Cursor, BatchNum) of @@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) -> {ok, Rows, {Cursor, BatchNum}} end. +delete_cursor(_State, {Cursor, _}) -> + qlc:delete_cursor(Cursor); +delete_cursor(_State, undefined) -> + ok. + page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), QH = @@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when %% Fill index records in batches. QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), + ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), %% Enable read indices and unlock reindexing. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index d4ad43907..b29974068 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -21,6 +21,7 @@ -include("emqx_retainer.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -96,14 +97,19 @@ end_per_group(_Group, Config) -> emqx_retainer_mnesia:populate_index_meta(), Config. -init_per_testcase(t_get_basic_usage_info, Config) -> +init_per_testcase(_TestCase, Config) -> mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_MESSAGE), emqx_retainer_mnesia:populate_index_meta(), - Config; -init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(t_flow_control, _Config) -> + restore_delivery(); +end_per_testcase(t_cursor_cleanup, _Config) -> + restore_delivery(); +end_per_testcase(_TestCase, _Config) -> + ok. + app_spec() -> {emqx_retainer, ?BASE_CONF}. @@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), - LimiterCfg = make_limiter_cfg(Rate), - JsonCfg = make_limiter_json(<<"1/1s">>), - emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), - emqx_retainer:update_config(#{ - <<"delivery_rate">> => <<"1/1s">>, - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => JsonCfg - } - }), + setup_slow_delivery(), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -442,23 +436,60 @@ t_flow_control(_) -> {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), End = erlang:system_time(millisecond), + Diff = End - Begin, ?assert( - Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), ok = emqtt:disconnect(C1), + ok. + +t_cursor_cleanup(_) -> + setup_slow_delivery(), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + lists:foreach( + fun(I) -> + emqtt:publish( + C1, + <<"retained/", (integer_to_binary(I))/binary>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 5) + ), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + + snabbkaffe:start_trace(), + + ?assertWaitEvent( + emqtt:disconnect(C1), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + {Pid, Ref} = spawn_monitor(fun() -> ok end), + receive + {'DOWN', Ref, _, _, _} -> ok + after 1000 -> ct:fail("should receive 'DOWN' message") + end, + + ?assertWaitEvent( + emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + snabbkaffe:stop(), - emqx_limiter_server:del_bucket(emqx_retainer, internal), - emqx_retainer:update_config(#{ - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1 - } - }), ok. t_clear_expired(_) -> @@ -849,15 +880,21 @@ with_conf(ConfMod, Case) -> end. make_limiter_cfg(Rate) -> - Client = #{ - rate => Rate, - initial => 0, - burst => 0, - low_watermark => 1, - divisible => false, - max_retry_time => timer:seconds(5), - failure_strategy => force - }, + make_limiter_cfg(Rate, #{}). + +make_limiter_cfg(Rate, ClientOpts) -> + Client = maps:merge( + #{ + rate => Rate, + initial => 0, + burst => 0, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + ClientOpts + ), #{client => Client, rate => Rate, initial => 0, burst => 0}. make_limiter_json(Rate) -> @@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) -> Res = emqtt:publish(Client, Topic, Payload, Opts), ct:sleep(Time), Res. + +setup_slow_delivery() -> + Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), + LimiterCfg = make_limiter_cfg(Rate), + JsonCfg = make_limiter_json(<<"1/1s">>), + emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), + emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"batch_deliver_limiter">> => JsonCfg + } + }). + +restore_delivery() -> + emqx_limiter_server:del_bucket(emqx_retainer, internal), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1 + } + }). + +qlc_processes() -> + lists:filter( + fun(Pid) -> + {current_function, {qlc, wait_for_request, 3}} =:= + erlang:process_info(Pid, current_function) + end, + erlang:processes() + ). + +qlc_process_count() -> + length(qlc_processes()). diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 537facd80..e260b04e1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -744,18 +744,18 @@ t_regex_replace(_) -> ?assertEqual(<<"a[cc]b[c]d">>, apply_func(regex_replace, [<<"accbcd">>, <<"c+">>, <<"[&]">>])). t_unescape(_) -> - ?assertEqual(<<"\n">>, emqx_rule_funcs:unescape(<<"\\n">>)), - ?assertEqual(<<"\t">>, emqx_rule_funcs:unescape(<<"\\t">>)), - ?assertEqual(<<"\r">>, emqx_rule_funcs:unescape(<<"\\r">>)), - ?assertEqual(<<"\b">>, emqx_rule_funcs:unescape(<<"\\b">>)), - ?assertEqual(<<"\f">>, emqx_rule_funcs:unescape(<<"\\f">>)), - ?assertEqual(<<"\v">>, emqx_rule_funcs:unescape(<<"\\v">>)), - ?assertEqual(<<"'">>, emqx_rule_funcs:unescape(<<"\\'">>)), - ?assertEqual(<<"\"">>, emqx_rule_funcs:unescape(<<"\\\"">>)), - ?assertEqual(<<"?">>, emqx_rule_funcs:unescape(<<"\\?">>)), - ?assertEqual(<<"\a">>, emqx_rule_funcs:unescape(<<"\\a">>)), + ?assertEqual(<<"\n">> = <<10>>, emqx_rule_funcs:unescape(<<"\\n">>)), + ?assertEqual(<<"\t">> = <<9>>, emqx_rule_funcs:unescape(<<"\\t">>)), + ?assertEqual(<<"\r">> = <<13>>, emqx_rule_funcs:unescape(<<"\\r">>)), + ?assertEqual(<<"\b">> = <<8>>, emqx_rule_funcs:unescape(<<"\\b">>)), + ?assertEqual(<<"\f">> = <<12>>, emqx_rule_funcs:unescape(<<"\\f">>)), + ?assertEqual(<<"\v">> = <<11>>, emqx_rule_funcs:unescape(<<"\\v">>)), + ?assertEqual(<<"'">> = <<39>>, emqx_rule_funcs:unescape(<<"\\'">>)), + ?assertEqual(<<"\"">> = <<34>>, emqx_rule_funcs:unescape(<<"\\\"">>)), + ?assertEqual(<<"?">> = <<63>>, emqx_rule_funcs:unescape(<<"\\?">>)), + ?assertEqual(<<7>>, emqx_rule_funcs:unescape(<<"\\a">>)), % Test escaping backslash itself - ?assertEqual(<<"\\">>, emqx_rule_funcs:unescape(<<"\\\\">>)), + ?assertEqual(<<"\\">> = <<92>>, emqx_rule_funcs:unescape(<<"\\\\">>)), % Test a string without any escape sequences ?assertEqual(<<"Hello, World!">>, emqx_rule_funcs:unescape(<<"Hello, World!">>)), % Test a string with escape sequences diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index 91bd4f9cf..ed9dfb851 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -289,7 +289,8 @@ unescape_string([$\\, $" | Rest], Acc) -> unescape_string([$\\, $? | Rest], Acc) -> unescape_string(Rest, [$\? | Acc]); unescape_string([$\\, $a | Rest], Acc) -> - unescape_string(Rest, [$\a | Acc]); + %% Terminal bell + unescape_string(Rest, [7 | Acc]); %% Start of HEX escape code unescape_string([$\\, $x | [$0 | _] = HexStringStart], Acc) -> unescape_handle_hex_string(HexStringStart, Acc); diff --git a/changes/ce/fix-12993.en.md b/changes/ce/fix-12993.en.md new file mode 100644 index 000000000..3e565841c --- /dev/null +++ b/changes/ce/fix-12993.en.md @@ -0,0 +1,5 @@ +Fix listener config update API when handling an unknown zone. + +Prior to this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`, +the change would be accepted, causing all clients to crash when connect. +After this fix, updating listener with an unknown zone name will get a "Bad request" response. diff --git a/changes/ce/fix-12996.en.md b/changes/ce/fix-12996.en.md new file mode 100644 index 000000000..0c3cc872f --- /dev/null +++ b/changes/ce/fix-12996.en.md @@ -0,0 +1 @@ +Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak. diff --git a/changes/ee/fix-13001.en.md b/changes/ee/fix-13001.en.md new file mode 100644 index 000000000..5d431e0f5 --- /dev/null +++ b/changes/ee/fix-13001.en.md @@ -0,0 +1 @@ +Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost.