Merge pull request #13015 from thalesmg/sync-r57-m-20240510

sync `release-57` to `master`
This commit is contained in:
Thales Macedo Garitezi 2024-05-10 12:18:02 -03:00 committed by GitHub
commit c02701dfa1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 426 additions and 207 deletions

View File

@ -20,6 +20,7 @@
%% API %% API
-export([add_handler/0, remove_handler/0, pre_config_update/3]). -export([add_handler/0, remove_handler/0, pre_config_update/3]).
-export([is_olp_enabled/0]). -export([is_olp_enabled/0]).
-export([assert_zone_exists/1]).
-define(ZONES, [zones]). -define(ZONES, [zones]).
@ -44,3 +45,26 @@ is_olp_enabled() ->
false, false,
emqx_config:get([zones], #{}) emqx_config:get([zones], #{})
). ).
-spec assert_zone_exists(binary() | atom()) -> ok.
assert_zone_exists(Name0) when is_binary(Name0) ->
%% an existing zone must have already an atom-name
Name =
try
binary_to_existing_atom(Name0)
catch
_:_ ->
throw({unknown_zone, Name0})
end,
assert_zone_exists(Name);
assert_zone_exists(default) ->
%% there is always a 'default' zone
ok;
assert_zone_exists(Name) when is_atom(Name) ->
try
_ = emqx_config:get([zones, Name]),
ok
catch
error:{config_not_found, _} ->
throw({unknown_zone, Name})
end.

View File

@ -124,7 +124,7 @@ format_raw_listeners({Type0, Conf}) ->
Bind = parse_bind(LConf0), Bind = parse_bind(LConf0),
MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()), MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()),
Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
LConf1 = maps:without([<<"authentication">>, <<"zone">>], LConf0), LConf1 = maps:without([<<"authentication">>], LConf0),
LConf2 = maps:put(<<"running">>, Running, LConf1), LConf2 = maps:put(<<"running">>, Running, LConf1),
CurrConn = CurrConn =
case Running of case Running of
@ -526,6 +526,7 @@ pre_config_update([?ROOT_KEY, _Type, _Name], {update, _Request}, undefined) ->
pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) -> pre_config_update([?ROOT_KEY, Type, Name], {update, Request}, RawConf) ->
RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request), RawConf1 = emqx_utils_maps:deep_merge(RawConf, Request),
RawConf2 = ensure_override_limiter_conf(RawConf1, Request), RawConf2 = ensure_override_limiter_conf(RawConf1, Request),
ok = assert_zone_exists(RawConf2),
{ok, convert_certs(Type, Name, RawConf2)}; {ok, convert_certs(Type, Name, RawConf2)};
pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) -> pre_config_update([?ROOT_KEY, _Type, _Name], {action, _Action, Updated}, RawConf) ->
{ok, emqx_utils_maps:deep_merge(RawConf, Updated)}; {ok, emqx_utils_maps:deep_merge(RawConf, Updated)};
@ -896,6 +897,11 @@ convert_certs(Type, Name, Conf) ->
filter_stacktrace({Reason, _Stacktrace}) -> Reason; filter_stacktrace({Reason, _Stacktrace}) -> Reason;
filter_stacktrace(Reason) -> Reason. filter_stacktrace(Reason) -> Reason.
assert_zone_exists(#{<<"zone">> := Zone}) ->
emqx_config_zones:assert_zone_exists(Zone);
assert_zone_exists(_) ->
ok.
%% limiter config should override, not merge %% limiter config should override, not merge
ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) -> ensure_override_limiter_conf(Conf, #{<<"limiter">> := Limiter}) ->
Conf#{<<"limiter">> => Limiter}; Conf#{<<"limiter">> => Limiter};

View File

@ -622,7 +622,7 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -
replay_streams(Session#{replay := Rest}, ClientInfo); replay_streams(Session#{replay := Rest}, ClientInfo);
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
RetryTimeout = ?TIMEOUT_RETRY_REPLAY, RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
?SLOG(warning, #{ ?SLOG(debug, #{
msg => "failed_to_fetch_replay_batch", msg => "failed_to_fetch_replay_batch",
stream => StreamKey, stream => StreamKey,
reason => Reason, reason => Reason,
@ -925,7 +925,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
Session#{s => S}; Session#{s => S};
{error, Class, Reason} -> {error, Class, Reason} ->
%% TODO: Handle unrecoverable error. %% TODO: Handle unrecoverable error.
?SLOG(info, #{ ?SLOG(debug, #{
msg => "failed_to_fetch_batch", msg => "failed_to_fetch_batch",
stream => StreamKey, stream => StreamKey,
reason => Reason, reason => Reason,

View File

@ -220,7 +220,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
}, },
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
?SLOG(warning, #{ ?SLOG(debug, #{
msg => "failed_to_initialize_stream_iterator", msg => "failed_to_initialize_stream_iterator",
stream => Stream, stream => Stream,
class => recoverable, class => recoverable,

View File

@ -19,6 +19,7 @@
emqx_bridge_s3_connector_info emqx_bridge_s3_connector_info
]} ]}
]}, ]},
{mod, {emqx_bridge_s3_app, []}},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_aggreg_app). -module(emqx_bridge_s3_app).
-behaviour(application). -behaviour(application).
-export([start/2, stop/1]). -export([start/2, stop/1]).
@ -15,7 +15,7 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
emqx_connector_aggreg_sup:start_link(). emqx_bridge_s3_sup:start_link().
stop(_State) -> stop(_State) ->
ok. ok.

View File

@ -87,6 +87,8 @@
channels := #{channel_id() => channel_state()} channels := #{channel_id() => channel_state()}
}. }.
-define(AGGREG_SUP, emqx_bridge_s3_sup).
%% %%
-spec callback_mode() -> callback_mode(). -spec callback_mode() -> callback_mode().
@ -209,6 +211,7 @@ start_channel(State, #{
bucket := Bucket bucket := Bucket
} }
}) -> }) ->
AggregId = {Type, Name},
AggregOpts = #{ AggregOpts = #{
time_interval => TimeInterval, time_interval => TimeInterval,
max_records => MaxRecords, max_records => MaxRecords,
@ -223,19 +226,21 @@ start_channel(State, #{
client_config => maps:get(client_config, State), client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters) uploader_config => maps:with([min_part_size, max_part_size], Parameters)
}, },
_ = emqx_connector_aggreg_sup:delete_child({Type, Name}), _ = ?AGGREG_SUP:delete_child(AggregId),
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ {ok, SupPid} = ?AGGREG_SUP:start_child(#{
id => {Type, Name}, id => AggregId,
start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, start =>
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
type => supervisor, type => supervisor,
restart => permanent restart => permanent
}), }),
#{ #{
type => ?ACTION_AGGREGATED_UPLOAD, type => ?ACTION_AGGREGATED_UPLOAD,
name => Name, name => Name,
aggreg_id => AggregId,
bucket => Bucket, bucket => Bucket,
supervisor => SupPid, supervisor => SupPid,
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
}. }.
upload_options(Parameters) -> upload_options(Parameters) ->
@ -254,12 +259,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
%% Since bucket name may be templated, we can't really provide any additional %% Since bucket name may be templated, we can't really provide any additional
%% information regarding the channel health. %% information regarding the channel health.
?status_connected; ?status_connected;
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> channel_status(
#{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
) ->
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(Name, Timestamp), ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_bucket_accessible(Bucket, State), ok = check_bucket_accessible(Bucket, State),
ok = check_aggreg_upload_errors(Name), ok = check_aggreg_upload_errors(AggregId),
?status_connected. ?status_connected.
check_bucket_accessible(Bucket, #{client_config := Config}) -> check_bucket_accessible(Bucket, #{client_config := Config}) ->
@ -278,8 +285,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
end end
end. end.
check_aggreg_upload_errors(Name) -> check_aggreg_upload_errors(AggregId) ->
case emqx_connector_aggregator:take_error(Name) of case emqx_connector_aggregator:take_error(AggregId) of
[Error] -> [Error] ->
%% TODO %% TODO
%% This approach means that, for example, 3 upload failures will cause %% This approach means that, for example, 3 upload failures will cause
@ -353,11 +360,11 @@ run_simple_upload(
{error, map_error(Reason)} {error, map_error(Reason)}
end. end.
run_aggregated_upload(InstId, Records, #{name := Name}) -> run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok -> ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
ok; ok;
{error, Reason} -> {error, Reason} ->
{error, {unrecoverable_error, Reason}} {error, {unrecoverable_error, Reason}}
@ -406,8 +413,8 @@ init_transfer_state(BufferMap, Opts) ->
Key = mk_object_key(BufferMap, Opts), Key = mk_object_key(BufferMap, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(BufferMap, #{action := Name, key := Template}) -> mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
process_append(Writes, Upload0) -> process_append(Writes, Upload0) ->
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0), {ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
@ -443,9 +450,9 @@ process_terminate(Upload) ->
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
{ok, integer() | string()} | {error, undefined}. {ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {Name, _Buffer}) -> lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
{ok, mk_fs_safe_string(Name)}; {ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_Name, Buffer = #{}}) -> lookup(Accessor, {_AggregId, Buffer = #{}}) ->
lookup_buffer_var(Accessor, Buffer); lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) -> lookup(_Accessor, _Context) ->
{error, undefined}. {error, undefined}.

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_s3_sup).
%% API
-export([
start_link/0,
start_child/1,
delete_child/1
]).
%% `supervisor' API
-export([init/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
delete_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
Error ->
Error
end.
%%------------------------------------------------------------------------------
%% `supervisor' API
%%------------------------------------------------------------------------------
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 1,
period => 1
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -158,6 +158,7 @@ t_on_get_status(Config) ->
t_aggreg_upload(Config) -> t_aggreg_upload(Config) ->
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
BridgeNameString = unicode:characters_to_list(BridgeName), BridgeNameString = unicode:characters_to_list(BridgeName),
NodeString = atom_to_list(node()), NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
@ -170,7 +171,7 @@ t_aggreg_upload(Config) ->
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
?assertMatch( ?assertMatch(
@ -196,6 +197,7 @@ t_aggreg_upload(Config) ->
t_aggreg_upload_rule(Config) -> t_aggreg_upload_rule(Config) ->
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
%% Create a bridge with the sample configuration and a simple SQL rule. %% Create a bridge with the sample configuration and a simple SQL rule.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) ->
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
]), ]),
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) ->
%% after a restart. %% after a restart.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send some sample messages that look like Rule SQL productions. %% Send some sample messages that look like Rule SQL productions.
@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) ->
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages. %% Send some more messages.
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check there's still only one upload. %% Check there's still only one upload.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
%% and does so while preserving uncompromised data. %% and does so while preserving uncompromised data.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
BatchSize = ?CONF_MAX_RECORDS div 2, BatchSize = ?CONF_MAX_RECORDS div 2,
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) ->
%% Ensure that they span multiple batch queries. %% Ensure that they span multiple batch queries.
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{?snk_kind := connector_aggreg_records_written, action := BridgeName}, #{?snk_kind := connector_aggreg_records_written, action := AggregId},
infinity, infinity,
0 0
), ),
%% Find out the buffer file. %% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until( {ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
), ),
%% Stop the bridge, corrupt the buffer file, and restart the bridge. %% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
], ],
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check that upload contains part of the first batch and all of the second batch. %% Check that upload contains part of the first batch and all of the second batch.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) ->
%% a restart. %% a restart.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send few large messages that will require multipart upload. %% Send few large messages that will require multipart upload.
@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) ->
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check that delivery contains all the messages. %% Check that delivery contains all the messages.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
[_Header | Rows] = fetch_parse_csv(Bucket, Key), [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) ->
%% and windowing work correctly under high rate, high concurrency conditions. %% and windowing work correctly under high rate, high concurrency conditions.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
NSenders = 4, NSenders = 4,
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) ->
%% Wait for the last delivery to complete. %% Wait for the last delivery to complete.
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
?block_until( ?block_until(
#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0
), ),
%% There should be at least 2 time windows of aggregated records. %% There should be at least 2 time windows of aggregated records.
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) ->
#{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
{ok, CSV} = erl_csv:decode(Content), {ok, CSV} = erl_csv:decode(Content),
CSV. CSV.
aggreg_id(BridgeName) ->
{?BRIDGE_TYPE, BridgeName}.

View File

@ -213,9 +213,9 @@ on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) ->
), ),
status_result(Health). status_result(Health).
status_result(true) -> connected; status_result(true) -> ?status_connected;
status_result(false) -> connecting; status_result(false) -> ?status_disconnected;
status_result({error, _}) -> connecting. status_result({error, _}) -> ?status_disconnected.
on_add_channel( on_add_channel(
_InstanceId, _InstanceId,
@ -251,7 +251,7 @@ on_get_channels(InstanceId) ->
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of case maps:is_key(ChannelId, Channels) of
true -> true ->
connected; ?status_connected;
_ -> _ ->
{error, not_exists} {error, not_exists}
end. end.

View File

@ -347,7 +347,7 @@ t_get_status(Config) ->
_Sleep = 500, _Sleep = 500,
_Attempts = 10, _Attempts = 10,
?assertMatch( ?assertMatch(
#{status := connecting}, #{status := disconnected},
emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME)
) )
). ).

View File

@ -25,7 +25,7 @@
]). ]).
-record(delivery, { -record(delivery, {
name :: _Name, id :: id(),
callback_module :: module(), callback_module :: module(),
container :: emqx_connector_aggreg_csv:container(), container :: emqx_connector_aggreg_csv:container(),
reader :: emqx_connector_aggreg_buffer:reader(), reader :: emqx_connector_aggreg_buffer:reader(),
@ -33,6 +33,8 @@
empty :: boolean() empty :: boolean()
}). }).
-type id() :: term().
-type state() :: #delivery{}. -type state() :: #delivery{}.
-type init_opts() :: #{ -type init_opts() :: #{
@ -59,22 +61,22 @@
%% %%
start_link(Name, Buffer, Opts) -> start_link(Id, Buffer, Opts) ->
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]).
%% %%
-spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). -spec init(pid(), id(), buffer(), init_opts()) -> no_return().
init(Parent, Name, Buffer, Opts) -> init(Parent, Id, Buffer, Opts) ->
?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}),
Reader = open_buffer(Buffer), Reader = open_buffer(Buffer),
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}),
_ = erlang:process_flag(trap_exit, true), _ = erlang:process_flag(trap_exit, true),
ok = proc_lib:init_ack({ok, self()}), ok = proc_lib:init_ack({ok, self()}),
loop(Delivery, Parent, []). loop(Delivery, Parent, []).
init_delivery( init_delivery(
Name, Id,
Reader, Reader,
Buffer, Buffer,
Opts = #{ Opts = #{
@ -84,7 +86,7 @@ init_delivery(
) -> ) ->
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
#delivery{ #delivery{
name = Name, id = Id,
callback_module = Mod, callback_module = Mod,
container = mk_container(ContainerOpts), container = mk_container(ContainerOpts),
reader = Reader, reader = Reader,
@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
error({transfer_failed, Reason}) error({transfer_failed, Reason})
end. end.
process_complete(#delivery{name = Name, empty = true}) -> process_complete(#delivery{id = Id, empty = true}) ->
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}),
exit({shutdown, {skipped, empty}}); exit({shutdown, {skipped, empty}});
process_complete(#delivery{ process_complete(#delivery{
name = Name, callback_module = Mod, container = Container, transfer = Transfer0 id = Id, callback_module = Mod, container = Container, transfer = Transfer0
}) -> }) ->
Trailer = emqx_connector_aggreg_csv:close(Container), Trailer = emqx_connector_aggreg_csv:close(Container),
Transfer = Mod:process_append(Trailer, Transfer0), Transfer = Mod:process_append(Trailer, Transfer0),
{ok, Completed} = Mod:process_complete(Transfer), {ok, Completed} = Mod:process_complete(Transfer),
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
ok. ok.
%% %%

View File

@ -1,42 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggreg_sup).
-export([
start_link/0,
start_child/1,
delete_child/1
]).
-behaviour(supervisor).
-export([init/1]).
-define(SUPREF, ?MODULE).
%%
start_link() ->
supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
start_child(ChildSpec) ->
supervisor:start_child(?SUPREF, ChildSpec).
delete_child(ChildId) ->
case supervisor:terminate_child(?SUPREF, ChildId) of
ok ->
supervisor:delete_child(?SUPREF, ChildId);
Error ->
Error
end.
%%
init(root) ->
SupFlags = #{
strategy => one_for_one,
intensity => 1,
period => 1
},
{ok, {SupFlags, []}}.

View File

@ -7,7 +7,6 @@
stdlib stdlib
]}, ]},
{env, []}, {env, []},
{mod, {emqx_connector_aggreg_app, []}},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -810,6 +810,7 @@ listener_id_status_example() ->
tcp_schema_example() -> tcp_schema_example() ->
#{ #{
type => tcp,
acceptors => 16, acceptors => 16,
access_rules => ["allow all"], access_rules => ["allow all"],
bind => <<"0.0.0.0:1884">>, bind => <<"0.0.0.0:1884">>,
@ -820,6 +821,7 @@ tcp_schema_example() ->
proxy_protocol => false, proxy_protocol => false,
proxy_protocol_timeout => <<"3s">>, proxy_protocol_timeout => <<"3s">>,
running => true, running => true,
zone => default,
tcp_options => #{ tcp_options => #{
active_n => 100, active_n => 100,
backlog => 1024, backlog => 1024,
@ -829,8 +831,7 @@ tcp_schema_example() ->
reuseaddr => true, reuseaddr => true,
send_timeout => <<"15s">>, send_timeout => <<"15s">>,
send_timeout_close => true send_timeout_close => true
}, }
type => tcp
}. }.
create_listener(From, Body) -> create_listener(From, Body) ->

View File

@ -36,9 +36,12 @@ groups() ->
MaxConnTests = [ MaxConnTests = [
t_max_connection_default t_max_connection_default
], ],
ZoneTests = [
t_update_listener_zone
],
[ [
{with_defaults_in_file, AllTests -- MaxConnTests}, {with_defaults_in_file, AllTests -- MaxConnTests},
{without_defaults_in_file, AllTests -- MaxConnTests}, {without_defaults_in_file, AllTests -- (MaxConnTests ++ ZoneTests)},
{max_connections, MaxConnTests} {max_connections, MaxConnTests}
]. ].
@ -403,6 +406,21 @@ crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type, Port
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(delete, NewPath, [], [])), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(delete, NewPath, [], [])),
ok. ok.
t_update_listener_zone({init, Config}) ->
%% fake a zone
Config;
t_update_listener_zone({'end', _Config}) ->
ok;
t_update_listener_zone(_Config) ->
ListenerId = <<"tcp:default">>,
Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
Conf = request(get, Path, [], []),
%% update
AddConf1 = Conf#{<<"zone">> => <<"unknownzone">>},
AddConf2 = Conf#{<<"zone">> => <<"zone1">>},
?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)),
?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)).
t_delete_nonexistent_listener(Config) when is_list(Config) -> t_delete_nonexistent_listener(Config) when is_list(Config) ->
NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]), NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]),
?assertMatch( ?assertMatch(
@ -518,5 +536,9 @@ cert_file(Name) ->
default_listeners_hocon_text() -> default_listeners_hocon_text() ->
Sc = #{roots => emqx_schema:listeners()}, Sc = #{roots => emqx_schema:listeners()},
Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}), Listeners = hocon_tconf:make_serializable(Sc, #{}, #{}),
Config = #{<<"listeners">> => Listeners}, Zones = #{<<"zone1">> => #{<<"mqtt">> => #{<<"max_inflight">> => 2}}},
Config = #{
<<"listeners">> => Listeners,
<<"zones">> => Zones
},
hocon_pp:do(Config, #{}). hocon_pp:do(Config, #{}).

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.22"}, {vsn, "5.0.23"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -47,6 +47,7 @@
retained_count/0, retained_count/0,
backend_module/0, backend_module/0,
backend_module/1, backend_module/1,
backend_state/1,
enabled/0 enabled/0
]). ]).
@ -103,6 +104,7 @@
-callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}. {ok, has_next(), list(message())}.
-callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
-callback delete_cursor(backend_state(), cursor()) -> ok.
-callback clear_expired(backend_state()) -> ok. -callback clear_expired(backend_state()) -> ok.
-callback clean(backend_state()) -> ok. -callback clean(backend_state()) -> ok.
-callback size(backend_state()) -> non_neg_integer(). -callback size(backend_state()) -> non_neg_integer().
@ -339,7 +341,7 @@ count(Context) ->
clear_expired(Context) -> clear_expired(Context) ->
Mod = backend_module(Context), Mod = backend_module(Context),
BackendState = backend_state(Context), BackendState = backend_state(Context),
Mod:clear_expired(BackendState). ok = Mod:clear_expired(BackendState).
-spec store_retained(context(), message()) -> ok. -spec store_retained(context(), message()) -> ok.
store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->

View File

@ -46,15 +46,26 @@
-type limiter() :: emqx_htb_limiter:limiter(). -type limiter() :: emqx_htb_limiter:limiter().
-type context() :: emqx_retainer:context(). -type context() :: emqx_retainer:context().
-type topic() :: emqx_types:topic(). -type topic() :: emqx_types:topic().
-type cursor() :: emqx_retainer:cursor().
-define(POOL, ?MODULE). -define(POOL, ?MODULE).
%% For tests
-export([
dispatch/3
]).
%% This module is `emqx_retainer` companion
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
dispatch(Context, Topic) -> dispatch(Context, Topic) ->
cast({?FUNCTION_NAME, Context, self(), Topic}). dispatch(Context, Topic, self()).
dispatch(Context, Topic, Pid) ->
cast({dispatch, Context, Pid, Topic}).
%% reset the client's limiter after updated the limiter's config %% reset the client's limiter after updated the limiter's config
refresh_limiter() -> refresh_limiter() ->
@ -156,7 +167,7 @@ handle_call(Req, _From, State) ->
| {noreply, NewState :: term(), hibernate} | {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), NewState :: term()}. | {stop, Reason :: term(), NewState :: term()}.
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter),
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast({refresh_limiter, Conf}, State) -> handle_cast({refresh_limiter, Conf}, State) ->
BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
@ -234,86 +245,120 @@ format_status(_Opt, Status) ->
cast(Msg) -> cast(Msg) ->
gen_server:cast(worker(), Msg). gen_server:cast(worker(), Msg).
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}.
dispatch(Context, Pid, Topic, Cursor, Limiter) -> dispatch(Context, Pid, Topic, Limiter) ->
Mod = emqx_retainer:backend_module(Context), Mod = emqx_retainer:backend_module(Context),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of State = emqx_retainer:backend_state(Context),
false -> case emqx_topic:wildcard(Topic) of
{ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
deliver(Result, Context, Pid, Topic, undefined, Limiter);
true -> true ->
{ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined),
deliver(Result, Context, Pid, Topic, NewCursor, Limiter) dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter);
false ->
{ok, Messages} = Mod:read_message(State, Topic),
dispatch_at_once(Messages, Pid, Topic, Limiter)
end. end.
-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> dispatch_at_once(Messages, Pid, Topic, Limiter0) ->
{ok, limiter()}. case deliver(Messages, Pid, Topic, Limiter0) of
deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> {ok, Limiter1} ->
{ok, Limiter1};
{drop, Limiter1} ->
{ok, Limiter1};
no_receiver ->
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
{ok, Limiter0}
end.
dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) ->
ok = delete_cursor(Context, Cursor),
{ok, Limiter}; {ok, Limiter};
deliver([], Context, Pid, Topic, Cursor, Limiter) -> dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) ->
dispatch(Context, Pid, Topic, Cursor, Limiter); case deliver(Messages0, Pid, Topic, Limiter0) of
deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> {ok, Limiter1} ->
{ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0),
dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1);
{drop, Limiter1} ->
ok = delete_cursor(Context, Cursor0),
{ok, Limiter1};
no_receiver ->
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
ok = delete_cursor(Context, Cursor0),
{ok, Limiter0}
end.
match_next(_Context, _Topic, undefined) ->
{ok, [], undefined};
match_next(Context, Topic, Cursor) ->
Mod = emqx_retainer:backend_module(Context),
State = emqx_retainer:backend_state(Context),
Mod:match_messages(State, Topic, Cursor).
delete_cursor(_Context, undefined) ->
ok;
delete_cursor(Context, Cursor) ->
Mod = emqx_retainer:backend_module(Context),
State = emqx_retainer:backend_state(Context),
Mod:delete_cursor(State, Cursor).
-spec deliver([emqx_types:message()], pid(), topic(), limiter()) ->
{ok, limiter()} | {drop, limiter()} | no_receiver.
deliver(Messages, Pid, Topic, Limiter) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
{ok, Limiter}; no_receiver;
_ -> _ ->
DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
case DeliverNum of case BatchSize of
0 -> 0 ->
do_deliver(Result, Pid, Topic), deliver_to_client(Messages, Pid, Topic),
{ok, Limiter}; {ok, Limiter};
_ -> _ ->
case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
{ok, Limiter2} ->
deliver([], Context, Pid, Topic, Cursor, Limiter2);
{drop, Limiter2} ->
{ok, Limiter2}
end
end end
end. end.
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) ->
{ok, Limiter}; {ok, Limiter};
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs),
case emqx_htb_limiter:consume(Num, Limiter) of case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of
{ok, Limiter2} -> {ok, Limiter1} ->
do_deliver(ToDelivers, Pid, Topic), ok = deliver_to_client(Batch, Pid, Topic),
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1);
{drop, _} = Drop -> {drop, _Limiter1} = Drop ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "retained_message_dropped", msg => "retained_message_dropped",
reason => "reached_ratelimit", reason => "reached_ratelimit",
dropped_count => length(ToDelivers) dropped_count => BatchActualSize
}), }),
Drop Drop
end. end.
do_deliver([Msg | T], Pid, Topic) -> deliver_to_client([Msg | T], Pid, Topic) ->
case emqx_banned:look_up({clientid, Msg#message.from}) of _ =
[] -> case emqx_banned:look_up({clientid, Msg#message.from}) of
Pid ! {deliver, Topic, Msg}, [] ->
ok; Pid ! {deliver, Topic, Msg};
_ -> _ ->
?tp( ?tp(
notice, notice,
ignore_retained_message_deliver, ignore_retained_message_deliver,
#{ #{
reason => "client is banned", reason => "client is banned",
clientid => Msg#message.from clientid => Msg#message.from
} }
) )
end, end,
do_deliver(T, Pid, Topic); deliver_to_client(T, Pid, Topic);
do_deliver([], _, _) -> deliver_to_client([], _, _) ->
ok. ok.
safe_split(N, List) -> take(N, List) ->
safe_split(N, List, 0, []). take(N, List, 0, []).
safe_split(0, List, Count, Acc) -> take(0, List, Count, Acc) ->
{Count, lists:reverse(Acc), List}; {Count, lists:reverse(Acc), List};
safe_split(_N, [], Count, Acc) -> take(_N, [], Count, Acc) ->
{Count, lists:reverse(Acc), []}; {Count, lists:reverse(Acc), []};
safe_split(N, [H | T], Count, Acc) -> take(N, [H | T], Count, Acc) ->
safe_split(N - 1, T, Count + 1, [H | Acc]). take(N - 1, T, Count + 1, [H | Acc]).

View File

@ -35,6 +35,7 @@
read_message/2, read_message/2,
page_read/4, page_read/4,
match_messages/3, match_messages/3,
delete_cursor/2,
clear_expired/1, clear_expired/1,
clean/1, clean/1,
size/1 size/1
@ -205,7 +206,7 @@ delete_message(_State, Topic) ->
read_message(_State, Topic) -> read_message(_State, Topic) ->
{ok, read_messages(Topic)}. {ok, read_messages(Topic)}.
match_messages(_State, Topic, undefined) -> match_messages(State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = msg_table(search_table(Tokens, Now)), QH = msg_table(search_table(Tokens, Now)),
@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) ->
{ok, qlc:eval(QH), undefined}; {ok, qlc:eval(QH), undefined};
BatchNum when is_integer(BatchNum) -> BatchNum when is_integer(BatchNum) ->
Cursor = qlc:cursor(QH), Cursor = qlc:cursor(QH),
match_messages(undefined, Topic, {Cursor, BatchNum}) match_messages(State, Topic, {Cursor, BatchNum})
end; end;
match_messages(_State, _Topic, {Cursor, BatchNum}) -> match_messages(_State, _Topic, {Cursor, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of case qlc_next_answers(Cursor, BatchNum) of
@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) ->
{ok, Rows, {Cursor, BatchNum}} {ok, Rows, {Cursor, BatchNum}}
end. end.
delete_cursor(_State, {Cursor, _}) ->
qlc:delete_cursor(Cursor);
delete_cursor(_State, undefined) ->
ok.
page_read(_State, Topic, Page, Limit) -> page_read(_State, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = QH =
@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when
%% Fill index records in batches. %% Fill index records in batches.
QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
%% Enable read indices and unlock reindexing. %% Enable read indices and unlock reindexing.

View File

@ -21,6 +21,7 @@
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -96,14 +97,19 @@ end_per_group(_Group, Config) ->
emqx_retainer_mnesia:populate_index_meta(), emqx_retainer_mnesia:populate_index_meta(),
Config. Config.
init_per_testcase(t_get_basic_usage_info, Config) -> init_per_testcase(_TestCase, Config) ->
mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE), mnesia:clear_table(?TAB_MESSAGE),
emqx_retainer_mnesia:populate_index_meta(), emqx_retainer_mnesia:populate_index_meta(),
Config;
init_per_testcase(_TestCase, Config) ->
Config. Config.
end_per_testcase(t_flow_control, _Config) ->
restore_delivery();
end_per_testcase(t_cursor_cleanup, _Config) ->
restore_delivery();
end_per_testcase(_TestCase, _Config) ->
ok.
app_spec() -> app_spec() ->
{emqx_retainer, ?BASE_CONF}. {emqx_retainer, ?BASE_CONF}.
@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) ->
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_flow_control(_) -> t_flow_control(_) ->
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), setup_slow_delivery(),
LimiterCfg = make_limiter_cfg(Rate),
JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => JsonCfg
}
}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
@ -442,23 +436,60 @@ t_flow_control(_) ->
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))), ?assertEqual(3, length(receive_messages(3))),
End = erlang:system_time(millisecond), End = erlang:system_time(millisecond),
Diff = End - Begin, Diff = End - Begin,
?assert( ?assert(
Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
), ),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
ok.
t_cursor_cleanup(_) ->
setup_slow_delivery(),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
emqtt:publish(
C1,
<<"retained/", (integer_to_binary(I))/binary>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 5)
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
snabbkaffe:start_trace(),
?assertWaitEvent(
emqtt:disconnect(C1),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
2000
),
?assertEqual(0, qlc_process_count()),
{Pid, Ref} = spawn_monitor(fun() -> ok end),
receive
{'DOWN', Ref, _, _, _} -> ok
after 1000 -> ct:fail("should receive 'DOWN' message")
end,
?assertWaitEvent(
emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>},
2000
),
?assertEqual(0, qlc_process_count()),
snabbkaffe:stop(),
emqx_limiter_server:del_bucket(emqx_retainer, internal),
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1
}
}),
ok. ok.
t_clear_expired(_) -> t_clear_expired(_) ->
@ -849,15 +880,21 @@ with_conf(ConfMod, Case) ->
end. end.
make_limiter_cfg(Rate) -> make_limiter_cfg(Rate) ->
Client = #{ make_limiter_cfg(Rate, #{}).
rate => Rate,
initial => 0, make_limiter_cfg(Rate, ClientOpts) ->
burst => 0, Client = maps:merge(
low_watermark => 1, #{
divisible => false, rate => Rate,
max_retry_time => timer:seconds(5), initial => 0,
failure_strategy => force burst => 0,
}, low_watermark => 1,
divisible => false,
max_retry_time => timer:seconds(5),
failure_strategy => force
},
ClientOpts
),
#{client => Client, rate => Rate, initial => 0, burst => 0}. #{client => Client, rate => Rate, initial => 0, burst => 0}.
make_limiter_json(Rate) -> make_limiter_json(Rate) ->
@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
Res = emqtt:publish(Client, Topic, Payload, Opts), Res = emqtt:publish(Client, Topic, Payload, Opts),
ct:sleep(Time), ct:sleep(Time),
Res. Res.
setup_slow_delivery() ->
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
LimiterCfg = make_limiter_cfg(Rate),
JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => JsonCfg
}
}).
restore_delivery() ->
emqx_limiter_server:del_bucket(emqx_retainer, internal),
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1
}
}).
qlc_processes() ->
lists:filter(
fun(Pid) ->
{current_function, {qlc, wait_for_request, 3}} =:=
erlang:process_info(Pid, current_function)
end,
erlang:processes()
).
qlc_process_count() ->
length(qlc_processes()).

View File

@ -744,18 +744,18 @@ t_regex_replace(_) ->
?assertEqual(<<"a[cc]b[c]d">>, apply_func(regex_replace, [<<"accbcd">>, <<"c+">>, <<"[&]">>])). ?assertEqual(<<"a[cc]b[c]d">>, apply_func(regex_replace, [<<"accbcd">>, <<"c+">>, <<"[&]">>])).
t_unescape(_) -> t_unescape(_) ->
?assertEqual(<<"\n">>, emqx_rule_funcs:unescape(<<"\\n">>)), ?assertEqual(<<"\n">> = <<10>>, emqx_rule_funcs:unescape(<<"\\n">>)),
?assertEqual(<<"\t">>, emqx_rule_funcs:unescape(<<"\\t">>)), ?assertEqual(<<"\t">> = <<9>>, emqx_rule_funcs:unescape(<<"\\t">>)),
?assertEqual(<<"\r">>, emqx_rule_funcs:unescape(<<"\\r">>)), ?assertEqual(<<"\r">> = <<13>>, emqx_rule_funcs:unescape(<<"\\r">>)),
?assertEqual(<<"\b">>, emqx_rule_funcs:unescape(<<"\\b">>)), ?assertEqual(<<"\b">> = <<8>>, emqx_rule_funcs:unescape(<<"\\b">>)),
?assertEqual(<<"\f">>, emqx_rule_funcs:unescape(<<"\\f">>)), ?assertEqual(<<"\f">> = <<12>>, emqx_rule_funcs:unescape(<<"\\f">>)),
?assertEqual(<<"\v">>, emqx_rule_funcs:unescape(<<"\\v">>)), ?assertEqual(<<"\v">> = <<11>>, emqx_rule_funcs:unescape(<<"\\v">>)),
?assertEqual(<<"'">>, emqx_rule_funcs:unescape(<<"\\'">>)), ?assertEqual(<<"'">> = <<39>>, emqx_rule_funcs:unescape(<<"\\'">>)),
?assertEqual(<<"\"">>, emqx_rule_funcs:unescape(<<"\\\"">>)), ?assertEqual(<<"\"">> = <<34>>, emqx_rule_funcs:unescape(<<"\\\"">>)),
?assertEqual(<<"?">>, emqx_rule_funcs:unescape(<<"\\?">>)), ?assertEqual(<<"?">> = <<63>>, emqx_rule_funcs:unescape(<<"\\?">>)),
?assertEqual(<<"\a">>, emqx_rule_funcs:unescape(<<"\\a">>)), ?assertEqual(<<7>>, emqx_rule_funcs:unescape(<<"\\a">>)),
% Test escaping backslash itself % Test escaping backslash itself
?assertEqual(<<"\\">>, emqx_rule_funcs:unescape(<<"\\\\">>)), ?assertEqual(<<"\\">> = <<92>>, emqx_rule_funcs:unescape(<<"\\\\">>)),
% Test a string without any escape sequences % Test a string without any escape sequences
?assertEqual(<<"Hello, World!">>, emqx_rule_funcs:unescape(<<"Hello, World!">>)), ?assertEqual(<<"Hello, World!">>, emqx_rule_funcs:unescape(<<"Hello, World!">>)),
% Test a string with escape sequences % Test a string with escape sequences

View File

@ -289,7 +289,8 @@ unescape_string([$\\, $" | Rest], Acc) ->
unescape_string([$\\, $? | Rest], Acc) -> unescape_string([$\\, $? | Rest], Acc) ->
unescape_string(Rest, [$\? | Acc]); unescape_string(Rest, [$\? | Acc]);
unescape_string([$\\, $a | Rest], Acc) -> unescape_string([$\\, $a | Rest], Acc) ->
unescape_string(Rest, [$\a | Acc]); %% Terminal bell
unescape_string(Rest, [7 | Acc]);
%% Start of HEX escape code %% Start of HEX escape code
unescape_string([$\\, $x | [$0 | _] = HexStringStart], Acc) -> unescape_string([$\\, $x | [$0 | _] = HexStringStart], Acc) ->
unescape_handle_hex_string(HexStringStart, Acc); unescape_handle_hex_string(HexStringStart, Acc);

View File

@ -0,0 +1,5 @@
Fix listener config update API when handling an unknown zone.
Prior to this fix, when a listener config is updated with an unknown zone, for example `{"zone": "unknown"}`,
the change would be accepted, causing all clients to crash when connect.
After this fix, updating listener with an unknown zone name will get a "Bad request" response.

View File

@ -0,0 +1 @@
Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.

View File

@ -0,0 +1 @@
Fixed an issue where the syskeeper forwarder would never reconnect when the connection was lost.