diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 11ffe664a..cdc20762e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -64,6 +64,7 @@ includes() ->[]. includes() -> [ "emqx_data_bridge" , "emqx_telemetry" + , "emqx_retainer" ]. -endif. diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 4db438a98..08220a207 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -5,37 +5,39 @@ ## Where to store the retained messages. ## ## Notice that all nodes in the same cluster have to be configured to -## use the same storage_type. -## -## Value: ram | disc | disc_only -## - ram: memory only -## - disc: both memory and disc -## - disc_only: disc only -## -## Default: ram -retainer.storage_type = ram +emqx_retainer: { + ## use the same storage_type. + ## + ## Value: ram | disc | disc_only + ## - ram: memory only + ## - disc: both memory and disc + ## - disc_only: disc only + ## + ## Default: ram + storage_type: ram -## Maximum number of retained messages. 0 means no limit. -## -## Value: Number >= 0 -retainer.max_retained_messages = 0 + ## Maximum number of retained messages. 0 means no limit. + ## + ## Value: Number >= 0 + max_retained_messages: 0 -## Maximum retained message size. -## -## Value: Bytes -retainer.max_payload_size = 1MB + ## Maximum retained message size. + ## + ## Value: Bytes + max_payload_size: 1MB -## Expiry interval of the retained messages. Never expire if the value is 0. -## -## Value: Duration -## - h: hour -## - m: minute -## - s: second -## -## Examples: -## - 2h: 2 hours -## - 30m: 30 minutes -## - 20s: 20 seconds -## -## Default: 0 -retainer.expiry_interval = 0 + ## Expiry interval of the retained messages. Never expire if the value is 0. + ## + ## Value: Duration + ## - h: hour + ## - m: minute + ## - s: second + ## + ## Examples: + ## - 2h: 2 hours + ## - 30m: 30 minutes + ## - 20s: 20 seconds + ## + ## Default: 0s + expiry_interval: 0s +} diff --git a/apps/emqx_retainer/priv/emqx_retainer.schema b/apps/emqx_retainer/priv/emqx_retainer.schema deleted file mode 100644 index e598864e1..000000000 --- a/apps/emqx_retainer/priv/emqx_retainer.schema +++ /dev/null @@ -1,30 +0,0 @@ -%%-*- mode: erlang -*- -%% Retainer config mapping - -%% Storage Type -%% {$configurable} -{mapping, "retainer.storage_type", "emqx_retainer.storage_type", [ - {default, ram}, - {datatype, {enum, [ram, disc, disc_only]}} -]}. - -%% Maximum number of retained messages. -%% {$configurable} -{mapping, "retainer.max_retained_messages", "emqx_retainer.max_retained_messages", [ - {default, 0}, - {datatype, integer} -]}. - -%% Maximum payload size of retained message. -%% {$configurable} -{mapping, "retainer.max_payload_size", "emqx_retainer.max_payload_size", [ - {default, "1MB"}, - {datatype, bytesize} -]}. - -%% Expiry interval of retained message -%% {$configurable} -{mapping, "retainer.expiry_interval", "emqx_retainer.expiry_interval", [ - {default, 0}, - {datatype, [integer, {duration, ms}]} -]}. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 94c561b39..affbc5ca3 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -25,14 +25,14 @@ -logger_header("[Retainer]"). --export([start_link/1]). +-export([start_link/0]). --export([ load/1 +-export([ load/0 , unload/0 ]). -export([ on_session_subscribed/3 - , on_message_publish/2 + , on_message_publish/1 ]). -export([clean/1]). @@ -51,15 +51,25 @@ -record(state, {stats_fun, stats_timer, expiry_timer}). +-define(STATS_INTERVAL, timer:seconds(1)). +-define(DEF_STORAGE_TYPE, ram). +-define(DEF_MAX_RETAINED_MESSAGES, 0). +-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). +-define(DEF_EXPIRY_INTERVAL, 0). + +%% convenient to generate stats_timer/expiry_timer +-define(MAKE_TIMER(State, Timer, Interval, Msg), + State#state{Timer = erlang:send_after(Interval, self(), Msg)}). + -rlog_shard({?RETAINER_SHARD, ?TAB}). %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- -load(Env) -> +load() -> _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, []}), - _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}), + _ = emqx:hook('message.publish', {?MODULE, on_message_publish, []}), ok. unload() -> @@ -85,15 +95,15 @@ dispatch(Pid, Topic) -> %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #message{flags = #{retain := true}, topic = Topic, - payload = <<>>}, _Env) -> + payload = <<>>}) -> ekka_mnesia:dirty_delete(?TAB, topic2tokens(Topic)), {ok, Msg}; -on_message_publish(Msg = #message{flags = #{retain := true}}, Env) -> +on_message_publish(Msg = #message{flags = #{retain := true}}) -> Msg1 = emqx_message:set_header(retained, true, Msg), - store_retained(Msg1, Env), + store_retained(Msg1), {ok, Msg}; -on_message_publish(Msg, _Env) -> +on_message_publish(Msg) -> {ok, Msg}. %%-------------------------------------------------------------------- @@ -101,9 +111,9 @@ on_message_publish(Msg, _Env) -> %%-------------------------------------------------------------------- %% @doc Start the retainer --spec(start_link(Env :: list()) -> emqx_types:startlink_ret()). -start_link(Env) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec(clean(emqx_types:topic()) -> non_neg_integer()). clean(Topic) when is_binary(Topic) -> @@ -124,8 +134,10 @@ clean(Topic) when is_binary(Topic) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Env]) -> - Copies = case proplists:get_value(storage_type, Env, disc) of +init([]) -> + StorageType = emqx_config:get([?MODULE, storage_type], ?DEF_STORAGE_TYPE), + ExpiryInterval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), + Copies = case StorageType of ram -> ram_copies; disc -> disc_copies; disc_only -> disc_only_copies @@ -149,17 +161,15 @@ init([Env]) -> ok end, StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'), - {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), - State = #state{stats_fun = StatsFun, stats_timer = StatsTimer}, - {ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}. + State = ?MAKE_TIMER(#state{stats_fun = StatsFun}, stats_timer, ?STATS_INTERVAL, stats), + {ok, start_expire_timer(ExpiryInterval, State)}. start_expire_timer(0, State) -> State; start_expire_timer(undefined, State) -> State; start_expire_timer(Ms, State) -> - {ok, Timer} = timer:send_interval(Ms, expire), - State#state{expiry_timer = Timer}. + ?MAKE_TIMER(State, expiry_timer, Ms, expire). handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), @@ -171,19 +181,20 @@ handle_cast(Msg, State) -> handle_info(stats, State = #state{stats_fun = StatsFun}) -> StatsFun(retained_count()), - {noreply, State, hibernate}; + {noreply, ?MAKE_TIMER(State, stats_timer, ?STATS_INTERVAL, stats), hibernate}; handle_info(expire, State) -> ok = expire_messages(), - {noreply, State, hibernate}; + Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), + {noreply, start_expire_timer(Interval, State), hibernate}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{stats_timer = TRef1, expiry_timer = TRef2}) -> - _ = timer:cancel(TRef1), - _ = timer:cancel(TRef2), + _ = erlang:cancel_timer(TRef1), + _ = erlang:cancel_timer(TRef2), ok. code_change(_OldVsn, State, _Extra) -> @@ -192,31 +203,33 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - sort_retained([]) -> []; sort_retained([Msg]) -> [Msg]; sort_retained(Msgs) -> lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> - Ts1 =< Ts2 - end, Msgs). + Ts1 =< Ts2 end, + Msgs). -store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> - case {is_table_full(Env), is_too_big(size(Payload), Env)} of +store_retained(Msg = #message{topic = Topic, payload = Payload}) -> + case {is_table_full(), is_too_big(size(Payload))} of {false, false} -> ok = emqx_metrics:inc('messages.retained'), ekka_mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic), msg = Msg, - expiry_time = get_expiry_time(Msg, Env)}); + expiry_time = get_expiry_time(Msg)}); {true, false} -> {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD, fun() -> - case mnesia:read(?TAB, Topic) of - [_] -> - mnesia:write(?TAB, #retained{topic = topic2tokens(Topic), - msg = Msg, - expiry_time = get_expiry_time(Msg, Env)}, write); - [] -> - ?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic]) + case mnesia:read(?TAB, Topic) of + [_] -> + mnesia:write(?TAB, + #retained{topic = topic2tokens(Topic), + msg = Msg, + expiry_time = get_expiry_time(Msg)}, + write); + [] -> + ?LOG(error, + "Cannot retain message(topic=~s) for table is full!", [Topic]) end end), ok; @@ -227,22 +240,24 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) -> "for payload is too big!", [Topic, iolist_size(Payload)]) end. -is_table_full(Env) -> - Limit = proplists:get_value(max_retained_messages, Env, 0), +is_table_full() -> + Limit = emqx_config:get([?MODULE, max_retained_messages], ?DEF_MAX_RETAINED_MESSAGES), Limit > 0 andalso (retained_count() > Limit). -is_too_big(Size, Env) -> - Limit = proplists:get_value(max_payload_size, Env, 0), +is_too_big(Size) -> + Limit = emqx_config:get([?MODULE, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), Limit > 0 andalso (Size > Limit). -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}, _Env) -> +get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> 0; -get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = Ts}, _Env) -> +get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, + timestamp = Ts}) -> Ts + Interval * 1000; -get_expiry_time(#message{timestamp = Ts}, Env) -> - case proplists:get_value(expiry_interval, Env, 0) of +get_expiry_time(#message{timestamp = Ts}) -> + Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL), + case Interval of 0 -> 0; - Interval -> Ts + Interval + _ -> Ts + Interval end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index adca5ae7a..3f42ddbd6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -25,9 +25,8 @@ ]). start(_Type, _Args) -> - Env = application:get_all_env(emqx_retainer), - {ok, Sup} = emqx_retainer_sup:start_link(Env), - emqx_retainer:load(Env), + {ok, Sup} = emqx_retainer_sup:start_link(), + emqx_retainer:load(), emqx_retainer_cli:load(), {ok, Sup}. diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl new file mode 100644 index 000000000..ece873dae --- /dev/null +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -0,0 +1,31 @@ +-module(emqx_retainer_schema). + +-include_lib("typerefl/include/types.hrl"). + +-type storage_type() :: ram | disc | disc_only. + +-reflect_type([storage_type/0]). + +-export([structs/0, fields/1]). + +structs() -> ["emqx_retainer"]. + +fields("emqx_retainer") -> + [ {storage_type, t(storage_type(), ram)} + , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} + , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} + , {expiry_interval, t(emqx_schema:duration_ms(), "0s")} + ]. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +t(Type, Default) -> + hoconsc:t(Type, #{default => Default}). + +t(Type, Default, Validator) -> + hoconsc:t(Type, #{default => Default, + validator => Validator}). + +is_pos_integer(V) -> + V >= 0. diff --git a/apps/emqx_retainer/src/emqx_retainer_sup.erl b/apps/emqx_retainer/src/emqx_retainer_sup.erl index fef245489..d01c20975 100644 --- a/apps/emqx_retainer/src/emqx_retainer_sup.erl +++ b/apps/emqx_retainer/src/emqx_retainer_sup.erl @@ -18,17 +18,17 @@ -behaviour(supervisor). --export([start_link/1]). +-export([start_link/0]). -export([init/1]). -start_link(Env) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). -init([Env]) -> - {ok, {{one_for_one, 10, 3600}, +init([]) -> + {ok, {{one_for_one, 10, 3600}, [#{id => retainer, - start => {emqx_retainer, start_link, [Env]}, + start => {emqx_retainer, start_link, []}, restart => permanent, shutdown => 5000, type => worker, diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 1df042dd9..1a0a00a1c 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -31,7 +31,7 @@ all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_retainer]), + emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1), Config. end_per_suite(_Config) -> @@ -39,16 +39,26 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) -> emqx_retainer:clean(<<"#">>), - case TestCase of - t_message_expiry_2 -> - application:set_env(emqx_retainer, expiry_interval, 2000); - _ -> - application:set_env(emqx_retainer, expiry_interval, 0) - end, - application:stop(emqx_retainer), + Interval = case TestCase of + t_message_expiry_2 -> 2000; + _ -> 0 + end, + init_emqx_retainer_conf(Interval), application:ensure_all_started(emqx_retainer), Config. +set_special_configs(emqx_retainer) -> + init_emqx_retainer_conf(0); +set_special_configs(_) -> + ok. + +init_emqx_retainer_conf(Expiry) -> + emqx_config:put([emqx_retainer], + #{storage_type => ram, + max_retained_messages => 0, + max_payload_size => 1024 * 1024, + expiry_interval => Expiry}). + %%-------------------------------------------------------------------- %% Test Cases %%--------------------------------------------------------------------