Merge pull request #7660 from lafirest/chore/fmt_retainer

Chore/fmt retainer
This commit is contained in:
lafirest 2022-04-19 10:52:53 +08:00 committed by GitHub
commit 58c1b1af87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 898 additions and 605 deletions

View File

@ -24,8 +24,10 @@
-type payload() :: binary(). -type payload() :: binary().
-type message() :: #message{}. -type message() :: #message{}.
-type context() :: #{context_id := pos_integer(), -type context() :: #{
atom() => term()}. context_id := pos_integer(),
atom() => term()
}.
-define(DELIVER_SEMAPHORE, deliver_remained_quota). -define(DELIVER_SEMAPHORE, deliver_remained_quota).
-type semaphore() :: ?DELIVER_SEMAPHORE. -type semaphore() :: ?DELIVER_SEMAPHORE.

View File

@ -1,27 +1,35 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{deps, [ {emqx, {path, "../emqx"}} {deps, [{emqx, {path, "../emqx"}}]}.
]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars, {erl_opts, [
warn_unused_vars,
warn_shadow_vars, warn_shadow_vars,
warn_unused_import, warn_unused_import,
warn_obsolete_guard, warn_obsolete_guard,
debug_info, debug_info,
{parse_transform}]}. {parse_transform}
]}.
{xref_checks, [undefined_function_calls, undefined_functions, {xref_checks, [
locals_not_used, deprecated_function_calls, undefined_function_calls,
warnings_as_errors, deprecated_functions]}. undefined_functions,
locals_not_used,
deprecated_function_calls,
warnings_as_errors,
deprecated_functions
]}.
{cover_enabled, true}. {cover_enabled, true}.
{cover_opts, [verbose]}. {cover_opts, [verbose]}.
{cover_export_enabled, true}. {cover_export_enabled, true}.
{profiles, {profiles, [
[{test, {test, [
[{deps, {deps, [
[ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}}]} ]}
]} ]}
]}. ]}.
{project_plugins, [erlfmt]}.

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_retainer, {application, emqx_retainer, [
[{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
{vsn, "5.0.0"}, % strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.0"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},
@ -9,7 +10,8 @@
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]}, {maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"}, {links, [
{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-retainer"} {"Github", "https://github.com/emqx/emqx-retainer"}
]} ]}
]}. ]}.

View File

@ -23,37 +23,42 @@
-export([start_link/0]). -export([start_link/0]).
-export([ on_session_subscribed/4 -export([
, on_message_publish/2 on_session_subscribed/4,
on_message_publish/2
]). ]).
-export([ delete_message/2 -export([
, store_retained/2 delete_message/2,
, get_backend_module/0 store_retained/2,
get_backend_module/0
]). ]).
-export([ get_expiry_time/1 -export([
, update_config/1 get_expiry_time/1,
, clean/0 update_config/1,
, delete/1 clean/0,
, page_read/3 delete/1,
, post_config_update/5 page_read/3,
, stats_fun/0 post_config_update/5,
stats_fun/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_cast/2 handle_call/3,
, handle_info/2 handle_cast/2,
, terminate/2 handle_info/2,
, code_change/3 terminate/2,
code_change/3
]). ]).
-type state() :: #{ enable := boolean() -type state() :: #{
, context_id := non_neg_integer() enable := boolean(),
, context := undefined | context() context_id := non_neg_integer(),
, clear_timer := undefined | reference() context := undefined | context(),
clear_timer := undefined | reference()
}. }.
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
@ -86,10 +91,14 @@ on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
end. end.
%% RETAIN flag set to 1 and payload containing zero bytes %% RETAIN flag set to 1 and payload containing zero bytes
on_message_publish(Msg = #message{flags = #{retain := true}, on_message_publish(
Msg = #message{
flags = #{retain := true},
topic = Topic, topic = Topic,
payload = <<>>}, payload = <<>>
Context) -> },
Context
) ->
delete_message(Context, Topic), delete_message(Context, Topic),
case get_stop_publish_clear_msg() of case get_stop_publish_clear_msg() of
true -> true ->
@ -97,7 +106,6 @@ on_message_publish(Msg = #message{flags = #{retain := true},
_ -> _ ->
{ok, Msg} {ok, Msg}
end; end;
on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
Msg1 = emqx_message:set_header(retained, true, Msg), Msg1 = emqx_message:set_header(retained, true, Msg),
store_retained(Context, Msg1), store_retained(Context, Msg1),
@ -110,14 +118,16 @@ on_message_publish(Msg, _) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Start the retainer %% @doc Start the retainer
-spec(start_link() -> emqx_types:startlink_ret()). -spec start_link() -> emqx_types:startlink_ret().
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
0; 0;
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, get_expiry_time(#message{
timestamp = Ts}) -> headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = Ts
}) ->
Ts + Interval * 1000; Ts + Interval * 1000;
get_expiry_time(#message{timestamp = Ts}) -> get_expiry_time(#message{timestamp = Ts}) ->
Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
@ -171,20 +181,16 @@ init([]) ->
handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call({update_config, NewConf, OldConf}, _, State) ->
State2 = update_config(State, NewConf, OldConf), State2 = update_config(State, NewConf, OldConf),
{reply, ok, State2}; {reply, ok, State2};
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), clean(Context),
{reply, ok, State}; {reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) -> handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Result = Mod:page_read(Context, Topic, Page, Limit), Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State}; {reply, Result, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -194,7 +200,6 @@ handle_cast(stats_fun, #{context := Context} = State) ->
Size = Mod:size(Context), Size = Mod:size(Context),
emqx_stats:setstat('retained.count', 'retained.max', Size), emqx_stats:setstat('retained.count', 'retained.max', Size),
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
@ -204,7 +209,6 @@ handle_info(clear_expired, #{context := Context} = State) ->
Mod:clear_expired(Context), Mod:clear_expired(Context),
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -222,7 +226,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec new_state() -> state(). -spec new_state() -> state().
new_state() -> new_state() ->
#{enable => false, #{
enable => false,
context_id => 0, context_id => 0,
context => undefined, context => undefined,
clear_timer => undefined clear_timer => undefined
@ -249,11 +254,13 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
Size = iolist_size(Payload), Size = iolist_size(Payload),
case payload_size_limit() of case payload_size_limit() of
Limit when Limit > 0 andalso Limit < Size -> Limit when Limit > 0 andalso Limit < Size ->
?SLOG(error, #{msg => "retain_failed_for_payload_size_exceeded_limit", ?SLOG(error, #{
msg => "retain_failed_for_payload_size_exceeded_limit",
topic => Topic, topic => Topic,
config => emqx_hocon:format_path(?MAX_PAYLOAD_SIZE_CONFIG_PATH), config => emqx_hocon:format_path(?MAX_PAYLOAD_SIZE_CONFIG_PATH),
size => Size, size => Size,
limit => Limit}); limit => Limit
});
_ -> _ ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:store_retained(Context, Msg) Mod:store_retained(Context, Msg)
@ -266,23 +273,30 @@ clean(Context) ->
-spec update_config(state(), hocons:config(), hocons:config()) -> state(). -spec update_config(state(), hocons:config(), hocons:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, Conf, OldConf) ->
update_config(maps:get(enable, Conf), update_config(
maps:get(enable, Conf),
maps:get(enable, OldConf), maps:get(enable, OldConf),
State, State,
Conf, Conf,
OldConf). OldConf
).
-spec update_config(boolean(), boolean(), state(), hocons:config(), hocons:config()) -> state(). -spec update_config(boolean(), boolean(), state(), hocons:config(), hocons:config()) -> state().
update_config(false, _, State, _, _) -> update_config(false, _, State, _, _) ->
disable_retainer(State); disable_retainer(State);
update_config(true, false, State, NewConf, _) -> update_config(true, false, State, NewConf, _) ->
enable_retainer(State, NewConf); enable_retainer(State, NewConf);
update_config(
update_config(true, true, true,
#{clear_timer := ClearTimer} = State, NewConf, OldConf) -> true,
#{backend := BackendCfg, #{clear_timer := ClearTimer} = State,
msg_clear_interval := ClearInterval} = NewConf, NewConf,
OldConf
) ->
#{
backend := BackendCfg,
msg_clear_interval := ClearInterval
} = NewConf,
#{backend := OldBackendCfg} = OldConf, #{backend := OldBackendCfg} = OldConf,
@ -290,32 +304,47 @@ update_config(true, true,
OldStrorageType = maps:get(type, OldBackendCfg), OldStrorageType = maps:get(type, OldBackendCfg),
case OldStrorageType of case OldStrorageType of
StorageType -> StorageType ->
State#{clear_timer := check_timer(ClearTimer, State#{
clear_timer := check_timer(
ClearTimer,
ClearInterval, ClearInterval,
clear_expired)}; clear_expired
)
};
_ -> _ ->
State2 = disable_retainer(State), State2 = disable_retainer(State),
enable_retainer(State2, NewConf) enable_retainer(State2, NewConf)
end. end.
-spec enable_retainer(state(), hocon:config()) -> state(). -spec enable_retainer(state(), hocon:config()) -> state().
enable_retainer(#{context_id := ContextId} = State, enable_retainer(
#{msg_clear_interval := ClearInterval, #{context_id := ContextId} = State,
backend := BackendCfg}) -> #{
msg_clear_interval := ClearInterval,
backend := BackendCfg
}
) ->
NewContextId = ContextId + 1, NewContextId = ContextId + 1,
Context = create_resource(new_context(NewContextId), BackendCfg), Context = create_resource(new_context(NewContextId), BackendCfg),
load(Context), load(Context),
State#{enable := true, State#{
enable := true,
context_id := NewContextId, context_id := NewContextId,
context := Context, context := Context,
clear_timer := add_timer(ClearInterval, clear_expired)}. clear_timer := add_timer(ClearInterval, clear_expired)
}.
-spec disable_retainer(state()) -> state(). -spec disable_retainer(state()) -> state().
disable_retainer(#{clear_timer := ClearTimer, disable_retainer(
context := Context} = State) -> #{
clear_timer := ClearTimer,
context := Context
} = State
) ->
unload(), unload(),
ok = close_resource(Context), ok = close_resource(Context),
State#{enable := false, State#{
enable := false,
clear_timer := stop_timer(ClearTimer) clear_timer := stop_timer(ClearTimer)
}. }.
@ -344,7 +373,8 @@ check_timer(Timer, _, _) ->
-spec get_backend_module() -> backend(). -spec get_backend_module() -> backend().
get_backend_module() -> get_backend_module() ->
ModName = case emqx:get_config([retainer, backend]) of ModName =
case emqx:get_config([retainer, backend]) of
#{type := built_in_database} -> mnesia; #{type := built_in_database} -> mnesia;
#{type := Backend} -> Backend #{type := Backend} -> Backend
end, end,
@ -353,15 +383,17 @@ get_backend_module() ->
create_resource(Context, #{type := built_in_database} = Cfg) -> create_resource(Context, #{type := built_in_database} = Cfg) ->
emqx_retainer_mnesia:create_resource(Cfg), emqx_retainer_mnesia:create_resource(Cfg),
Context; Context;
create_resource(Context, #{type := DB} = Config) -> create_resource(Context, #{type := DB} = Config) ->
ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]), ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]),
case emqx_resource:create( case
emqx_resource:create(
ResourceID, ResourceID,
<<"emqx_retainer">>, <<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config, Config,
#{}) of #{}
)
of
{ok, already_created} -> {ok, already_created} ->
Context#{resource_id => ResourceID}; Context#{resource_id => ResourceID};
{ok, _} -> {ok, _} ->

View File

@ -24,14 +24,17 @@
%% API %% API
-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]). -export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
-export([ lookup_retained_warp/2 -export([
, with_topic_warp/2 lookup_retained_warp/2,
, config/2]). with_topic_warp/2,
config/2
]).
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]). -import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
-import(emqx_dashboard_swagger, [error_codes/2]). -import(emqx_dashboard_swagger, [error_codes/2]).
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 %% 1MB = 1024 x 1024
-define(MAX_PAYLOAD_SIZE, 1048576).
-define(PREFIX, "/mqtt/retainer"). -define(PREFIX, "/mqtt/retainer").
-define(TAGS, [<<"retainer">>]). -define(TAGS, [<<"retainer">>]).
@ -44,49 +47,62 @@ paths() ->
[?PREFIX, ?PREFIX ++ "/messages", ?PREFIX ++ "/message/:topic"]. [?PREFIX, ?PREFIX ++ "/messages", ?PREFIX ++ "/message/:topic"].
schema(?PREFIX) -> schema(?PREFIX) ->
#{'operationId' => config, #{
get => #{tags => ?TAGS, 'operationId' => config,
get => #{
tags => ?TAGS,
description => <<"Get retainer config">>, description => <<"Get retainer config">>,
responses => #{200 => mk(conf_schema(), #{desc => "The config content"}), responses => #{
200 => mk(conf_schema(), #{desc => "The config content"}),
404 => error_codes(['NOT_FOUND'], <<"Config not found">>) 404 => error_codes(['NOT_FOUND'], <<"Config not found">>)
} }
}, },
put => #{tags => ?TAGS, put => #{
tags => ?TAGS,
description => <<"Update retainer config">>, description => <<"Update retainer config">>,
'requestBody' => mk(conf_schema(), #{desc => "The config content"}), 'requestBody' => mk(conf_schema(), #{desc => "The config content"}),
responses => #{200 => mk(conf_schema(), #{desc => "Update configs successfully"}), responses => #{
200 => mk(conf_schema(), #{desc => "Update configs successfully"}),
400 => error_codes(['UPDATE_FAILED'], <<"Update config failed">>) 400 => error_codes(['UPDATE_FAILED'], <<"Update config failed">>)
} }
} }
}; };
schema(?PREFIX ++ "/messages") -> schema(?PREFIX ++ "/messages") ->
#{'operationId' => lookup_retained_warp, #{
get => #{tags => ?TAGS, 'operationId' => lookup_retained_warp,
get => #{
tags => ?TAGS,
description => <<"List retained messages">>, description => <<"List retained messages">>,
parameters => page_params(), parameters => page_params(),
responses => #{200 => mk(array(ref(message_summary)), #{desc => "The result list"}), responses => #{
200 => mk(array(ref(message_summary)), #{desc => "The result list"}),
400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>)
} }
} }
}; };
schema(?PREFIX ++ "/message/:topic") -> schema(?PREFIX ++ "/message/:topic") ->
#{'operationId' => with_topic_warp, #{
get => #{tags => ?TAGS, 'operationId' => with_topic_warp,
get => #{
tags => ?TAGS,
description => <<"Lookup a message by a topic without wildcards">>, description => <<"Lookup a message by a topic without wildcards">>,
parameters => parameters(), parameters => parameters(),
responses => #{200 => mk(ref(message), #{desc => "Details of the message"}), responses => #{
200 => mk(ref(message), #{desc => "Details of the message"}),
404 => error_codes(['NOT_FOUND'], <<"Viewed message doesn't exist">>), 404 => error_codes(['NOT_FOUND'], <<"Viewed message doesn't exist">>),
400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>) 400 => error_codes(['BAD_REQUEST'], <<"Unsupported backend">>)
} }
}, },
delete => #{tags => ?TAGS, delete => #{
tags => ?TAGS,
description => <<"Delete matching messages">>, description => <<"Delete matching messages">>,
parameters => parameters(), parameters => parameters(),
responses => #{204 => <<>>, responses => #{
400 => error_codes(['BAD_REQUEST'], 204 => <<>>,
<<"Unsupported backend">>) 400 => error_codes(
['BAD_REQUEST'],
<<"Unsupported backend">>
)
} }
} }
}. }.
@ -98,23 +114,29 @@ conf_schema() ->
ref(emqx_retainer_schema, "retainer"). ref(emqx_retainer_schema, "retainer").
parameters() -> parameters() ->
[{topic, mk(binary(), #{in => path, [
{topic,
mk(binary(), #{
in => path,
required => true, required => true,
desc => "The target topic" desc => "The target topic"
})}]. })}
].
fields(message_summary) -> fields(message_summary) ->
[ {msgid, mk(binary(), #{desc => <<"Message ID">>})} [
, {topic, mk(binary(), #{desc => "The topic"})} {msgid, mk(binary(), #{desc => <<"Message ID">>})},
, {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})} {topic, mk(binary(), #{desc => "The topic"})},
, {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})} {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})},
, {from_clientid, mk(binary(), #{desc => "Publisher ClientId"})} {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})},
, {from_username, mk(binary(), #{desc => "Publisher Username"})} {from_clientid, mk(binary(), #{desc => "Publisher ClientId"})},
{from_username, mk(binary(), #{desc => "Publisher Username"})}
]; ];
fields(message) -> fields(message) ->
[{payload, mk(binary(), #{desc => "The payload content"})} | [
fields(message_summary)]. {payload, mk(binary(), #{desc => "The payload content"})}
| fields(message_summary)
].
lookup_retained_warp(Type, Params) -> lookup_retained_warp(Type, Params) ->
check_backend(Type, Params, fun lookup_retained/2). check_backend(Type, Params, fun lookup_retained/2).
@ -124,15 +146,16 @@ with_topic_warp(Type, Params) ->
config(get, _) -> config(get, _) ->
{200, emqx:get_raw_config([retainer])}; {200, emqx:get_raw_config([retainer])};
config(put, #{body := Body}) -> config(put, #{body := Body}) ->
try try
{ok, _} = emqx_retainer:update_config(Body), {ok, _} = emqx_retainer:update_config(Body),
{200, emqx:get_raw_config([retainer])} {200, emqx:get_raw_config([retainer])}
catch _:Reason:_ -> catch
{400, _:Reason:_ ->
#{code => <<"UPDATE_FAILED">>, {400, #{
message => iolist_to_binary(io_lib:format("~p~n", [Reason]))}} code => <<"UPDATE_FAILED">>,
message => iolist_to_binary(io_lib:format("~p~n", [Reason]))
}}
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -151,23 +174,33 @@ with_topic(get, #{bindings := Bindings}) ->
[H | _] -> [H | _] ->
{200, format_detail_message(H)}; {200, format_detail_message(H)};
_ -> _ ->
{404, #{code => <<"NOT_FOUND">>, {404, #{
code => <<"NOT_FOUND">>,
message => <<"Viewed message doesn't exist">> message => <<"Viewed message doesn't exist">>
}} }}
end; end;
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer_mnesia:delete_message(undefined, Topic),
{204}. {204}.
format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From format_message(#message{
, timestamp = Timestamp, headers = Headers}) -> id = ID,
#{msgid => emqx_guid:to_hexstr(ID), qos = Qos,
topic = Topic,
from = From,
timestamp = Timestamp,
headers = Headers
}) ->
#{
msgid => emqx_guid:to_hexstr(ID),
qos => Qos, qos => Qos,
topic => Topic, topic => Topic,
publish_at => list_to_binary(calendar:system_time_to_rfc3339( publish_at => list_to_binary(
Timestamp, [{unit, millisecond}])), calendar:system_time_to_rfc3339(
Timestamp, [{unit, millisecond}]
)
),
from_clientid => to_bin_string(From), from_clientid => to_bin_string(From),
from_username => maps:get(username, Headers, <<>>) from_username => maps:get(username, Headers, <<>>)
}. }.

View File

@ -18,13 +18,13 @@
-behaviour(application). -behaviour(application).
-export([ start/2 -export([
, stop/1 start/2,
stop/1
]). ]).
start(_Type, _Args) -> start(_Type, _Args) ->
emqx_retainer_sup:start_link(). emqx_retainer_sup:start_link().
stop(_State) -> stop(_State) ->
ok. ok.

View File

@ -22,15 +22,23 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% API %% API
-export([ start_link/2 -export([
, dispatch/2 start_link/2,
, refresh_limiter/0 dispatch/2,
, worker/0 refresh_limiter/0,
worker/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([
terminate/2, code_change/3, format_status/2]). init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-type limiter() :: emqx_htb_limiter:limiter(). -type limiter() :: emqx_htb_limiter:limiter().
@ -46,10 +54,12 @@ dispatch(Context, Topic) ->
%% an limiter update handler maybe added later, now this is a workaround %% an limiter update handler maybe added later, now this is a workaround
refresh_limiter() -> refresh_limiter() ->
Workers = gproc_pool:active_workers(?POOL), Workers = gproc_pool:active_workers(?POOL),
lists:foreach(fun({_, Pid}) -> lists:foreach(
fun({_, Pid}) ->
gen_server:cast(Pid, ?FUNCTION_NAME) gen_server:cast(Pid, ?FUNCTION_NAME)
end, end,
Workers). Workers
).
worker() -> worker() ->
gproc_pool:pick_worker(?POOL, self()). gproc_pool:pick_worker(?POOL, self()).
@ -59,13 +69,18 @@ worker() ->
%% Starts the server %% Starts the server
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_link(atom(), pos_integer()) -> {ok, Pid :: pid()} | -spec start_link(atom(), pos_integer()) ->
{error, Error :: {already_started, pid()}} | {ok, Pid :: pid()}
{error, Error :: term()} | | {error, Error :: {already_started, pid()}}
ignore. | {error, Error :: term()}
| ignore.
start_link(Pool, Id) -> start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, gen_server:start_link(
?MODULE, [Pool, Id], [{hibernate_after, 1000}]). {local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]
).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -77,11 +92,12 @@ start_link(Pool, Id) ->
%% Initializes the server %% Initializes the server
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec init(Args :: term()) -> {ok, State :: term()} | -spec init(Args :: term()) ->
{ok, State :: term(), Timeout :: timeout()} | {ok, State :: term()}
{ok, State :: term(), hibernate} | | {ok, State :: term(), Timeout :: timeout()}
{stop, Reason :: term()} | | {ok, State :: term(), hibernate}
ignore. | {stop, Reason :: term()}
| ignore.
init([Pool, Id]) -> init([Pool, Id]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}), true = gproc_pool:connect_worker(Pool, {Pool, Id}),
@ -96,14 +112,14 @@ init([Pool, Id]) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> -spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
{reply, Reply :: term(), NewState :: term()} | {reply, Reply :: term(), NewState :: term()}
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | | {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
{reply, Reply :: term(), NewState :: term(), hibernate} | | {reply, Reply :: term(), NewState :: term(), hibernate}
{noreply, NewState :: term()} | | {noreply, NewState :: term()}
{noreply, NewState :: term(), Timeout :: timeout()} | | {noreply, NewState :: term(), Timeout :: timeout()}
{noreply, NewState :: term(), hibernate} | | {noreply, NewState :: term(), hibernate}
{stop, Reason :: term(), Reply :: term(), NewState :: term()} | | {stop, Reason :: term(), Reply :: term(), NewState :: term()}
{stop, Reason :: term(), NewState :: term()}. | {stop, Reason :: term(), NewState :: term()}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -115,19 +131,17 @@ handle_call(Req, _From, State) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec handle_cast(Request :: term(), State :: term()) -> -spec handle_cast(Request :: term(), State :: term()) ->
{noreply, NewState :: term()} | {noreply, NewState :: term()}
{noreply, NewState :: term(), Timeout :: timeout()} | | {noreply, NewState :: term(), Timeout :: timeout()}
{noreply, NewState :: term(), hibernate} | | {noreply, NewState :: term(), hibernate}
{stop, Reason :: term(), NewState :: term()}. | {stop, Reason :: term(), NewState :: term()}.
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) -> handle_cast(refresh_limiter, State) ->
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(batch, BucketName), Limiter = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}}; {noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
@ -139,10 +153,10 @@ handle_cast(Msg, State) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec handle_info(Info :: timeout() | term(), State :: term()) -> -spec handle_info(Info :: timeout() | term(), State :: term()) ->
{noreply, NewState :: term()} | {noreply, NewState :: term()}
{noreply, NewState :: term(), Timeout :: timeout()} | | {noreply, NewState :: term(), Timeout :: timeout()}
{noreply, NewState :: term(), hibernate} | | {noreply, NewState :: term(), hibernate}
{stop, Reason :: normal | term(), NewState :: term()}. | {stop, Reason :: normal | term(), NewState :: term()}.
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -156,8 +170,10 @@ handle_info(Info, State) ->
%% with Reason. The return value is ignored. %% with Reason. The return value is ignored.
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), -spec terminate(
State :: term()) -> any(). Reason :: normal | shutdown | {shutdown, term()} | term(),
State :: term()
) -> any().
terminate(_Reason, #{pool := Pool, id := Id}) -> terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}). gproc_pool:disconnect_worker(Pool, {Pool, Id}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -166,10 +182,13 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
%% Convert process state when code is changed %% Convert process state when code is changed
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec code_change(OldVsn :: term() | {down, term()}, -spec code_change(
OldVsn :: term() | {down, term()},
State :: term(), State :: term(),
Extra :: term()) -> {ok, NewState :: term()} | Extra :: term()
{error, Reason :: term()}. ) ->
{ok, NewState :: term()}
| {error, Reason :: term()}.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -181,8 +200,10 @@ code_change(_OldVsn, State, _Extra) ->
%% or when it appears in termination error logs. %% or when it appears in termination error logs.
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec format_status(Opt :: normal | terminate, -spec format_status(
Status :: list()) -> Status :: term(). Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) -> format_status(_Opt, Status) ->
Status. Status.
@ -209,10 +230,8 @@ dispatch(Context, Pid, Topic, Cursor, Limiter) ->
{ok, limiter()}. {ok, limiter()}.
deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
{ok, Limiter}; {ok, Limiter};
deliver([], Context, Pid, Topic, Cursor, Limiter) -> deliver([], Context, Pid, Topic, Cursor, Limiter) ->
dispatch(Context, Pid, Topic, Cursor, Limiter); dispatch(Context, Pid, Topic, Cursor, Limiter);
deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
@ -235,7 +254,6 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
{ok, Limiter}; {ok, Limiter};
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
case emqx_htb_limiter:consume(Num, Limiter) of case emqx_htb_limiter:consume(Num, Limiter) of
@ -243,7 +261,8 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
do_deliver(ToDelivers, Pid, Topic), do_deliver(ToDelivers, Pid, Topic),
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
{drop, _} = Drop -> {drop, _} = Drop ->
?SLOG(error, #{msg => "retained_message_dropped", ?SLOG(error, #{
msg => "retained_message_dropped",
reason => "reached_ratelimit", reason => "reached_ratelimit",
dropped_count => length(ToDelivers) dropped_count => length(ToDelivers)
}), }),
@ -253,7 +272,6 @@ do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
do_deliver([Msg | T], Pid, Topic) -> do_deliver([Msg | T], Pid, Topic) ->
Pid ! {deliver, Topic, Msg}, Pid ! {deliver, Topic, Msg},
do_deliver(T, Pid, Topic); do_deliver(T, Pid, Topic);
do_deliver([], _, _) -> do_deliver([], _, _) ->
ok. ok.
@ -262,9 +280,7 @@ safe_split(N, List) ->
safe_split(0, List, Count, Acc) -> safe_split(0, List, Count, Acc) ->
{Count, lists:reverse(Acc), List}; {Count, lists:reverse(Acc), List};
safe_split(_N, [], Count, Acc) -> safe_split(_N, [], Count, Acc) ->
{Count, lists:reverse(Acc), []}; {Count, lists:reverse(Acc), []};
safe_split(N, [H | T], Count, Acc) -> safe_split(N, [H | T], Count, Acc) ->
safe_split(N - 1, T, Count + 1, [H | Acc]). safe_split(N - 1, T, Count + 1, [H | Acc]).

View File

@ -23,15 +23,15 @@
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
-export([
-export([ delete_message/2 delete_message/2,
, store_retained/2 store_retained/2,
, read_message/2 read_message/2,
, page_read/4 page_read/4,
, match_messages/3 match_messages/3,
, clear_expired/1 clear_expired/1,
, clean/1 clean/1,
, size/1 size/1
]). ]).
-export([create_resource/1]). -export([create_resource/1]).
@ -45,15 +45,20 @@
%% emqx_retainer_storage callbacks %% emqx_retainer_storage callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_resource(#{storage_type := StorageType}) -> create_resource(#{storage_type := StorageType}) ->
Copies = case StorageType of Copies =
case StorageType of
ram -> ram_copies; ram -> ram_copies;
disc -> disc_copies disc -> disc_copies
end, end,
StoreProps = [{ets, [compressed, StoreProps = [
{ets, [
compressed,
{read_concurrency, true}, {read_concurrency, true},
{write_concurrency, true}]}, {write_concurrency, true}
{dets, [{auto_save, 1000}]}], ]},
{dets, [{auto_save, 1000}]}
],
ok = mria:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, ordered_set}, {type, ordered_set},
@ -65,7 +70,8 @@ create_resource(#{storage_type := StorageType}) ->
]), ]),
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity), ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(?TAB, storage_type) of case mnesia:table_info(?TAB, storage_type) of
Copies -> ok; Copies ->
ok;
_Other -> _Other ->
{atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies), {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies),
ok ok
@ -75,30 +81,40 @@ store_retained(_, Msg =#message{topic = Topic}) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg), ExpiryTime = emqx_retainer:get_expiry_time(Msg),
case is_table_full() of case is_table_full() of
false -> false ->
mria:dirty_write(?TAB, mria:dirty_write(
#retained{topic = topic2tokens(Topic), ?TAB,
#retained{
topic = topic2tokens(Topic),
msg = Msg, msg = Msg,
expiry_time = ExpiryTime}); expiry_time = ExpiryTime
}
);
_ -> _ ->
Tokens = topic2tokens(Topic), Tokens = topic2tokens(Topic),
Fun = fun() -> Fun = fun() ->
case mnesia:read(?TAB, Tokens) of case mnesia:read(?TAB, Tokens) of
[_] -> [_] ->
mnesia:write(?TAB, mnesia:write(
#retained{topic = Tokens, ?TAB,
#retained{
topic = Tokens,
msg = Msg, msg = Msg,
expiry_time = ExpiryTime}, expiry_time = ExpiryTime
write); },
write
);
[] -> [] ->
mnesia:abort(table_is_full) mnesia:abort(table_is_full)
end end
end, end,
case mria:transaction(?RETAINER_SHARD, Fun) of case mria:transaction(?RETAINER_SHARD, Fun) of
{atomic, ok} -> ok; {atomic, ok} ->
ok;
{aborted, Reason} -> {aborted, Reason} ->
?SLOG(error, #{ msg => "failed_to_retain_message" ?SLOG(error, #{
, topic => Topic msg => "failed_to_retain_message",
, reason => Reason topic => Topic,
reason => Reason
}) })
end end
end. end.
@ -116,7 +132,8 @@ clear_expired(_) ->
delete_message(_, Topic) -> delete_message(_, Topic) ->
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
true -> match_delete_messages(Topic); true ->
match_delete_messages(Topic);
false -> false ->
Tokens = topic2tokens(Topic), Tokens = topic2tokens(Topic),
Fun = fun() -> Fun = fun() ->
@ -169,8 +186,7 @@ size(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
sort_retained([]) -> []; sort_retained([]) -> [];
sort_retained([Msg]) -> [Msg]; sort_retained([Msg]) -> [Msg];
sort_retained(Msgs) -> sort_retained(Msgs) -> lists:sort(fun compare_message/2, Msgs).
lists:sort(fun compare_message/2, Msgs).
compare_message(M1, M2) -> compare_message(M1, M2) ->
M1#message.timestamp =< M2#message.timestamp. M1#message.timestamp =< M2#message.timestamp.
@ -194,12 +210,13 @@ batch_read_messages(Cursor, MaxReadNum) ->
{ok, Answers, Cursor} {ok, Answers, Cursor}
end. end.
-spec(read_messages(emqx_types:topic()) -spec read_messages(emqx_types:topic()) ->
-> [emqx_types:message()]). [emqx_types:message()].
read_messages(Topic) -> read_messages(Topic) ->
Tokens = topic2tokens(Topic), Tokens = topic2tokens(Topic),
case mnesia:dirty_read(?TAB, Tokens) of case mnesia:dirty_read(?TAB, Tokens) of
[] -> []; [] ->
[];
[#retained{msg = Msg, expiry_time = Et}] -> [#retained{msg = Msg, expiry_time = Et}] ->
case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
true -> [Msg]; true -> [Msg];
@ -207,13 +224,13 @@ read_messages(Topic) ->
end end
end. end.
-spec(match_messages(emqx_types:topic()) -spec match_messages(emqx_types:topic()) ->
-> [emqx_types:message()]). [emqx_types:message()].
match_messages(Filter) -> match_messages(Filter) ->
Ms = make_match_spec(Filter), Ms = make_match_spec(Filter),
mnesia:dirty_select(?TAB, Ms). mnesia:dirty_select(?TAB, Ms).
-spec(match_delete_messages(emqx_types:topic()) -> ok). -spec match_delete_messages(emqx_types:topic()) -> ok.
match_delete_messages(Filter) -> match_delete_messages(Filter) ->
Cond = condition(emqx_topic:words(Filter)), Cond = condition(emqx_topic:words(Filter)),
MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'}, MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
@ -223,7 +240,13 @@ match_delete_messages(Filter) ->
%% @private %% @private
condition(Ws) -> condition(Ws) ->
Ws1 = [case W =:= '+' of true -> '_'; _ -> W end || W <- Ws], Ws1 = [
case W =:= '+' of
true -> '_';
_ -> W
end
|| W <- Ws
],
case lists:last(Ws1) =:= '#' of case lists:last(Ws1) =:= '#' of
false -> Ws1; false -> Ws1;
_ -> (Ws1 -- ['#']) ++ '_' _ -> (Ws1 -- ['#']) ++ '_'
@ -240,8 +263,10 @@ make_match_spec(Topic) ->
condition(emqx_topic:words(Topic)) condition(emqx_topic:words(Topic))
end, end,
MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'},
[{MsHd, [{'=:=', '$3', 0}], ['$2']}, [
{MsHd, [{'>', '$3', NowMs}], ['$2']}]. {MsHd, [{'=:=', '$3', 0}], ['$2']},
{MsHd, [{'>', '$3', NowMs}], ['$2']}
].
-spec make_cursor(undefined | topic()) -> qlc:query_cursor(). -spec make_cursor(undefined | topic()) -> qlc:query_cursor().
make_cursor(Topic) -> make_cursor(Topic) ->

View File

@ -11,47 +11,74 @@ namespace() -> "retainer".
roots() -> ["retainer"]. roots() -> ["retainer"].
fields("retainer") -> fields("retainer") ->
[ {enable, sc(boolean(), "Enable retainer feature.", false)} [
, {msg_expiry_interval, sc(emqx_schema:duration_ms(), {enable, sc(boolean(), "Enable retainer feature.", false)},
{msg_expiry_interval,
sc(
emqx_schema:duration_ms(),
"Message retention time. 0 means message will never be expired.", "Message retention time. 0 means message will never be expired.",
"0s")} "0s"
, {msg_clear_interval, sc(emqx_schema:duration_ms(), )},
{msg_clear_interval,
sc(
emqx_schema:duration_ms(),
"Periodic interval for cleaning up expired messages. " "Periodic interval for cleaning up expired messages. "
"Never clear if the value is 0.", "Never clear if the value is 0.",
"0s")} "0s"
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} )},
, {max_payload_size, sc(emqx_schema:bytesize(), {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))},
{max_payload_size,
sc(
emqx_schema:bytesize(),
"Maximum retained message size.", "Maximum retained message size.",
"1MB")} "1MB"
, {stop_publish_clear_msg, sc(boolean(), )},
{stop_publish_clear_msg,
sc(
boolean(),
"When the retained flag of the `PUBLISH` message is set and Payload is empty, " "When the retained flag of the `PUBLISH` message is set and Payload is empty, "
"whether to continue to publish the message.<br/>" "whether to continue to publish the message.<br/>"
"See: " "See: "
"http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038", "http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038",
false)} false
, {backend, backend_config()} )},
{backend, backend_config()}
]; ];
fields(mnesia_config) -> fields(mnesia_config) ->
[ {type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})} [
, {storage_type, sc(hoconsc:union([ram, disc]), {type, hoconsc:mk(hoconsc:union([built_in_database]), #{desc => "Backend type."})},
{storage_type,
sc(
hoconsc:union([ram, disc]),
"Specifies whether the messages are stored in RAM or persisted on disc.", "Specifies whether the messages are stored in RAM or persisted on disc.",
ram)} ram
, {max_retained_messages, sc(integer(), )},
{max_retained_messages,
sc(
integer(),
"Maximum number of retained messages. 0 means no limit.", "Maximum number of retained messages. 0 means no limit.",
0, 0,
fun is_pos_integer/1)} fun is_pos_integer/1
)}
]; ];
fields(flow_control) -> fields(flow_control) ->
[ {batch_read_number, sc(integer(), [
{batch_read_number,
sc(
integer(),
"Size of the batch when reading messages from storage. 0 means no limit.", "Size of the batch when reading messages from storage. 0 means no limit.",
0, 0,
fun is_pos_integer/1)} fun is_pos_integer/1
, {batch_deliver_number, sc(range(0, 1000), )},
{batch_deliver_number,
sc(
range(0, 1000),
"The number of retained messages can be delivered per batch.", "The number of retained messages can be delivered per batch.",
0)} 0
, {batch_deliver_limiter, sc(emqx_limiter_schema:bucket_name(), )},
{batch_deliver_limiter,
sc(
emqx_limiter_schema:bucket_name(),
"The rate limiter name for retained messages' delivery.<br/>" "The rate limiter name for retained messages' delivery.<br/>"
"Limiter helps to avoid delivering too many messages to the client at once, " "Limiter helps to avoid delivering too many messages to the client at once, "
"which may cause the client " "which may cause the client "
@ -60,7 +87,8 @@ fields(flow_control) ->
"The names of the available rate limiters are taken from the existing rate " "The names of the available rate limiters are taken from the existing rate "
"limiters under `limiter.batch`.<br/>" "limiters under `limiter.batch`.<br/>"
"If this field is empty, limiter is not used.", "If this field is empty, limiter is not used.",
undefined)} undefined
)}
]. ].
desc("retainer") -> desc("retainer") ->
@ -79,9 +107,11 @@ sc(Type, Desc, Default) ->
hoconsc:mk(Type, #{default => Default, desc => Desc}). hoconsc:mk(Type, #{default => Default, desc => Desc}).
sc(Type, Desc, Default, Validator) -> sc(Type, Desc, Default, Validator) ->
hoconsc:mk(Type, #{default => Default, hoconsc:mk(Type, #{
default => Default,
desc => Desc, desc => Desc,
validator => Validator}). validator => Validator
}).
is_pos_integer(V) -> is_pos_integer(V) ->
V >= 0. V >= 0.

View File

@ -26,13 +26,21 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
PoolSpec = emqx_pool_sup:spec([emqx_retainer_dispatcher, hash, emqx_vm:schedulers(), PoolSpec = emqx_pool_sup:spec([
{emqx_retainer_dispatcher, start_link, []}]), emqx_retainer_dispatcher,
{ok, {{one_for_one, 10, 3600}, hash,
[#{id => retainer, emqx_vm:schedulers(),
{emqx_retainer_dispatcher, start_link, []}
]),
{ok,
{{one_for_one, 10, 3600}, [
#{
id => retainer,
start => {emqx_retainer, start_link, []}, start => {emqx_retainer, start_link, []},
restart => permanent, restart => permanent,
shutdown => 5000, shutdown => 5000,
type => worker, type => worker,
modules => [emqx_retainer]}, modules => [emqx_retainer]
PoolSpec]}}. },
PoolSpec
]}}.

View File

@ -27,23 +27,27 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
-define(BASE_CONF, <<""" -define(BASE_CONF, <<
retainer { ""
enable = true "\n"
msg_clear_interval = 0s "retainer {\n"
msg_expiry_interval = 0s " enable = true\n"
max_payload_size = 1MB " msg_clear_interval = 0s\n"
flow_control { " msg_expiry_interval = 0s\n"
batch_read_number = 0 " max_payload_size = 1MB\n"
batch_deliver_number = 0 " flow_control {\n"
batch_deliver_limiter = retainer " batch_read_number = 0\n"
} " batch_deliver_number = 0\n"
backend { " batch_deliver_limiter = retainer\n"
type = built_in_database " }\n"
storage_type = ram " backend {\n"
max_retained_messages = 0 " type = built_in_database\n"
} " storage_type = ram\n"
}""">>). " max_retained_messages = 0\n"
" }\n"
"}"
""
>>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
@ -82,9 +86,11 @@ t_store_and_clean(_) ->
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained">>, C1,
<<"retained">>,
<<"this is a retained message">>, <<"this is a retained message">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
timer:sleep(100), timer:sleep(100),
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10), {ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
@ -121,9 +127,11 @@ t_retain_handling(_) ->
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
emqtt:publish( emqtt:publish(
C1, <<"retained">>, C1,
<<"retained">>,
<<"this is a retained message">>, <<"this is a retained message">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))), ?assertEqual(1, length(receive_messages(1))),
@ -154,17 +162,23 @@ t_wildcard_subscription(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained/0">>, C1,
<<"retained/0">>,
<<"this is a retained message 0">>, <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/1">>, C1,
<<"retained/1">>,
<<"this is a retained message 1">>, <<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/a/b/c">>, C1,
<<"retained/a/b/c">>,
<<"this is a retained message 2">>, <<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
@ -180,25 +194,38 @@ t_message_expiry(_) ->
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, C1,
<<"retained/0">>,
#{'Message-Expiry-Interval' => 0},
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, C1,
<<"retained/1">>,
#{'Message-Expiry-Interval' => 2},
<<"expire">>, <<"expire">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, C1,
<<"retained/2">>,
#{'Message-Expiry-Interval' => 5},
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/3">>, C1,
<<"retained/3">>,
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"$SYS/retained/4">>, C1,
<<"$SYS/retained/4">>,
<<"don't expire">>, <<"don't expire">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
@ -240,17 +267,23 @@ t_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained/0">>, C1,
<<"retained/0">>,
<<"this is a retained message 0">>, <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/1">>, C1,
<<"retained/1">>,
<<"this is a retained message 1">>, <<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
emqtt:publish( emqtt:publish(
C1, <<"retained/test/0">>, C1,
<<"retained/test/0">>,
<<"this is a retained message 2">>, <<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))), ?assertEqual(3, length(receive_messages(3))),
@ -266,7 +299,8 @@ t_stop_publish_clear_msg(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained/0">>, C1,
<<"retained/0">>,
<<"this is a retained message 0">>, <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
@ -282,9 +316,13 @@ t_stop_publish_clear_msg(_) ->
t_flow_control(_) -> t_flow_control(_) ->
#{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]), #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]),
RetainerCfg2 = RetainerCfg#{per_client := RetainerCfg2 = RetainerCfg#{
PerClient#{rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), per_client :=
capacity := 1}}, PerClient#{
rate := emqx_ratelimiter_SUITE:to_rate("1/1s"),
capacity := 1
}
},
emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2),
emqx_limiter_manager:restart_server(shared), emqx_limiter_manager:restart_server(shared),
timer:sleep(500), timer:sleep(500),
@ -292,24 +330,31 @@ t_flow_control(_) ->
emqx_retainer_dispatcher:refresh_limiter(), emqx_retainer_dispatcher:refresh_limiter(),
timer:sleep(500), timer:sleep(500),
emqx_retainer:update_config(#{<<"flow_control">> => emqx_retainer:update_config(#{
#{<<"batch_read_number">> => 1, <<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1, <<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => retainer}}), <<"batch_deliver_limiter">> => retainer
}
}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
C1, <<"retained/0">>, C1,
<<"retained/0">>,
<<"this is a retained message 0">>, <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, <<"retained/1">>, C1,
<<"retained/1">>,
<<"this is a retained message 1">>, <<"this is a retained message 1">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, <<"retained/3">>, C1,
<<"retained/3">>,
<<"this is a retained message 3">>, <<"this is a retained message 3">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
@ -319,8 +364,10 @@ t_flow_control(_) ->
End = erlang:system_time(millisecond), End = erlang:system_time(millisecond),
Diff = End - Begin, Diff = End - Begin,
?assert(Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), ?assert(
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))), Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
@ -335,22 +382,28 @@ t_flow_control(_) ->
t_clear_expired(_) -> t_clear_expired(_) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
Conf#{<<"msg_clear_interval">> := <<"1s">>, Conf#{
<<"msg_expiry_interval">> := <<"3s">>} <<"msg_clear_interval">> := <<"1s">>,
<<"msg_expiry_interval">> := <<"3s">>
}
end, end,
Case = fun() -> Case = fun() ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
lists:foreach(fun(I) -> lists:foreach(
emqtt:publish(C1, fun(I) ->
emqtt:publish(
C1,
<<"retained/", (I + 60):8/unsigned-integer>>, <<"retained/", (I + 60):8/unsigned-integer>>,
#{'Message-Expiry-Interval' => 3}, #{'Message-Expiry-Interval' => 3},
<<"retained">>, <<"retained">>,
[{qos, 0}, {retain, true}]) [{qos, 0}, {retain, true}]
)
end, end,
lists:seq(1, 5)), lists:seq(1, 5)
),
timer:sleep(1000), timer:sleep(1000),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
@ -373,11 +426,21 @@ t_max_payload_size(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, emqtt:publish(
<<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]), C1,
<<"retained/1">>,
#{},
<<"1234">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(C1, emqtt:publish(
<<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]), C1,
<<"retained/2">>,
#{},
<<"1234567">>,
[{qos, 0}, {retain, true}]
),
timer:sleep(500), timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10), {ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
@ -394,7 +457,8 @@ t_page_read(_) ->
timer:sleep(500), timer:sleep(500),
Fun = fun(I) -> Fun = fun(I) ->
emqtt:publish(C1, emqtt:publish(
C1,
<<"retained/", (I + 60)>>, <<"retained/", (I + 60)>>,
<<"this is a retained message">>, <<"this is a retained message">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
@ -451,7 +515,8 @@ with_conf(ConfMod, Case) ->
try try
Case(), Case(),
emqx_retainer:update_config(Conf) emqx_retainer:update_config(Conf)
catch Type:Error:Strace -> catch
Type:Error:Strace ->
emqx_retainer:update_config(Conf), emqx_retainer:update_config(Conf),
erlang:raise(Type, Error, Strace) erlang:raise(Type, Error, Strace)
end. end.

View File

@ -56,16 +56,28 @@ t_config(_Config) ->
Path = api_path(["mqtt", "retainer"]), Path = api_path(["mqtt", "retainer"]),
{ok, ConfJson} = request_api(get, Path), {ok, ConfJson} = request_api(get, Path),
ReturnConf = decode_json(ConfJson), ReturnConf = decode_json(ConfJson),
?assertMatch(#{backend := _, enable := _, flow_control := _, ?assertMatch(
max_payload_size := _, msg_clear_interval := _, #{
msg_expiry_interval := _}, backend := _,
ReturnConf), enable := _,
flow_control := _,
max_payload_size := _,
msg_clear_interval := _,
msg_expiry_interval := _
},
ReturnConf
),
UpdateConf = fun(Enable) -> UpdateConf = fun(Enable) ->
RawConf = emqx_json:decode(ConfJson, [return_maps]), RawConf = emqx_json:decode(ConfJson, [return_maps]),
UpdateJson = RawConf#{<<"enable">> := Enable}, UpdateJson = RawConf#{<<"enable">> := Enable},
{ok, UpdateResJson} = request_api(put, {ok, UpdateResJson} = request_api(
Path, [], auth_header_(), UpdateJson), put,
Path,
[],
auth_header_(),
UpdateJson
),
UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]), UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]),
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf)) ?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
end, end,
@ -80,9 +92,12 @@ t_messages(_) ->
timer:sleep(500), timer:sleep(500),
Each = fun(I) -> Each = fun(I) ->
emqtt:publish(C1, <<"retained/", (I + 60)>>, emqtt:publish(
C1,
<<"retained/", (I + 60)>>,
<<"retained">>, <<"retained">>,
[{qos, 0}, {retain, true}]) [{qos, 0}, {retain, true}]
)
end, end,
lists:foreach(Each, lists:seq(1, 5)), lists:foreach(Each, lists:seq(1, 5)),
@ -91,19 +106,28 @@ t_messages(_) ->
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])), {ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
Msgs = decode_json(MsgsJson), Msgs = decode_json(MsgsJson),
MsgLen = erlang:length(Msgs), MsgLen = erlang:length(Msgs),
?assert(MsgLen >= 5, ?assert(
io_lib:format("message length is:~p~n", [MsgLen])), %% maybe has $SYS messages MsgLen >= 5,
%% maybe has $SYS messages
io_lib:format("message length is:~p~n", [MsgLen])
),
[First | _] = Msgs, [First | _] = Msgs,
?assertMatch(#{msgid := _, topic := _, qos := _, ?assertMatch(
publish_at := _, from_clientid := _, from_username := _ #{
msgid := _,
topic := _,
qos := _,
publish_at := _,
from_clientid := _,
from_username := _
}, },
First), First
),
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_lookup_and_delete(_) -> t_lookup_and_delete(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqx_retainer:clean(), emqx_retainer:clean(),
@ -116,10 +140,18 @@ t_lookup_and_delete(_) ->
{ok, LookupJson} = request_api(get, API), {ok, LookupJson} = request_api(get, API),
LookupResult = decode_json(LookupJson), LookupResult = decode_json(LookupJson),
?assertMatch(#{msgid := _, topic := _, qos := _, payload := _, ?assertMatch(
publish_at := _, from_clientid := _, from_username := _ #{
msgid := _,
topic := _,
qos := _,
payload := _,
publish_at := _,
from_clientid := _,
from_username := _
}, },
LookupResult), LookupResult
),
{ok, []} = request_api(delete, API), {ok, []} = request_api(delete, API),

View File

@ -37,4 +37,3 @@ end_per_testcase(_TestCase, Config) ->
% t_load(_) -> % t_load(_) ->
% error('TODO'). % error('TODO').

View File

@ -74,17 +74,26 @@ t_publish_retain_message(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, Topic, #{}, Client1,
Topic,
#{},
<<"retained message">>, <<"retained message">>,
[{qos, 2}, {retain, true}]), [{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, Topic, #{}, Client1,
Topic,
#{},
<<"new retained message">>, <<"new retained message">>,
[{qos, 2}, {retain, true}]), [{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, Topic, #{}, Client1,
Topic,
#{},
<<"not retained message">>, <<"not retained message">>,
[{qos, 2}, {retain, false}]), [{qos, 2}, {retain, false}]
),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
[Msg] = receive_messages(3), [Msg] = receive_messages(3),
@ -95,7 +104,8 @@ t_publish_retain_message(_) ->
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]), {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.3.1-6] [MQTT-3.3.1-7] %% [MQTT-3.3.1-6] [MQTT-3.3.1-7]
?assertEqual(0, length(receive_messages(1))),
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
@ -103,39 +113,56 @@ t_publish_message_expiry_interval(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, Client1,
<<"topic/A">>,
#{'Message-Expiry-Interval' => 1},
<<"retained message">>, <<"retained message">>,
[{qos, 1}, {retain, true}]), [{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, Client1,
<<"topic/B">>,
#{'Message-Expiry-Interval' => 1},
<<"retained message">>, <<"retained message">>,
[{qos, 2}, {retain, true}]), [{qos, 2}, {retain, true}]
),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, Client1,
<<"topic/C">>,
#{'Message-Expiry-Interval' => 10},
<<"retained message">>, <<"retained message">>,
[{qos, 1}, {retain, true}]), [{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, Client1,
<<"topic/D">>,
#{'Message-Expiry-Interval' => 10},
<<"retained message">>, <<"retained message">>,
[{qos, 2}, {retain, true}]), [{qos, 2}, {retain, true}]
),
timer:sleep(1500), timer:sleep(1500),
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2), {ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
Msgs = receive_messages(6), Msgs = receive_messages(6),
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5] %% [MQTT-3.3.2-5]
?assertEqual(2, length(Msgs)),
L = lists:map( L = lists:map(
fun(Msg) -> fun(Msg) ->
MessageExpiryInterval = maps:get('Message-Expiry-Interval', MessageExpiryInterval = maps:get(
maps:get(properties, Msg)), 'Message-Expiry-Interval',
maps:get(properties, Msg)
),
MessageExpiryInterval < 10 MessageExpiryInterval < 10
end, Msgs), end,
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6] Msgs
),
%% [MQTT-3.3.2-6]
?assertEqual(2, length(L)),
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
clean_retained(<<"topic/C">>), clean_retained(<<"topic/C">>),
clean_retained(<<"topic/D">>). clean_retained(<<"topic/D">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Subsctibe %% Subsctibe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -144,17 +171,23 @@ t_subscribe_retain_handing(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
ok = emqtt:publish( ok = emqtt:publish(
Client1, <<"topic/A">>, #{}, Client1,
<<"topic/A">>,
#{},
<<"retained message">>, <<"retained message">>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{}, Client1,
<<"topic/B">>,
#{},
<<"retained message">>, <<"retained message">>,
[{qos, 1}, {retain, true}] [{qos, 1}, {retain, true}]
), ),
{ok, _} = emqtt:publish( {ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{}, Client1,
<<"topic/C">>,
#{},
<<"retained message">>, <<"retained message">>,
[{qos, 2}, {retain, true}] [{qos, 2}, {retain, true}]
), ),
@ -162,19 +195,24 @@ t_subscribe_retain_handing(_) ->
timer:sleep(200), timer:sleep(200),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10] %% [MQTT-3.3.1-10]
?assertEqual(3, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 2}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 2}, {qos, 2}]}]),
?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-11] %% [MQTT-3.3.1-11]
?assertEqual(0, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-9] %% [MQTT-3.3.1-9]
?assertEqual(3, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
?assertEqual(0, length(receive_messages(3))), %% [MQTT-3.3.1-10] %% [MQTT-3.3.1-10]
?assertEqual(0, length(receive_messages(3))),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 0}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.8.4-4] %% [MQTT-3.8.4-4]
?assertEqual(3, length(receive_messages(3))),
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
clean_retained(<<"topic/A">>), clean_retained(<<"topic/A">>),

View File

@ -19,3 +19,5 @@ acb3544d4b112121b5d9414237d2af7860ccc2a3
3f6d78dda03fd0d8e968a352e134f11a7f16bfe8 3f6d78dda03fd0d8e968a352e134f11a7f16bfe8
# reformat apps/emqx_exhook # reformat apps/emqx_exhook
1a4afabe9fed06bb0038f95c898be69c72b94bf3 1a4afabe9fed06bb0038f95c898be69c72b94bf3
# reformat apps/emqx_retainer
f1acfece6b79ed69b491da03783a7adaa7627b96

View File

@ -12,6 +12,7 @@ APPS+=( 'apps/emqx' 'apps/emqx_modules' 'apps/emqx_gateway')
APPS+=( 'apps/emqx_authn' 'apps/emqx_authz' ) APPS+=( 'apps/emqx_authn' 'apps/emqx_authz' )
APPS+=( 'lib-ee/emqx_enterprise_conf' 'lib-ee/emqx_license' ) APPS+=( 'lib-ee/emqx_enterprise_conf' 'lib-ee/emqx_license' )
APPS+=( 'apps/emqx_exhook') APPS+=( 'apps/emqx_exhook')
APPS+=( 'apps/emqx_retainer')
for app in "${APPS[@]}"; do for app in "${APPS[@]}"; do
echo "$app ..." echo "$app ..."