chore(retainer): change root name emqx_retainer > retainer
This commit is contained in:
parent
9a03869bd7
commit
6bb919b65c
|
@ -5,7 +5,7 @@
|
||||||
## Where to store the retained messages.
|
## Where to store the retained messages.
|
||||||
##
|
##
|
||||||
## Notice that all nodes in the same cluster have to be configured to
|
## Notice that all nodes in the same cluster have to be configured to
|
||||||
emqx_retainer {
|
retainer {
|
||||||
## enable/disable emqx_retainer
|
## enable/disable emqx_retainer
|
||||||
enable = true
|
enable = true
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
|
||||||
false ->
|
false ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]),
|
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]),
|
||||||
case MaxDeliverNum of
|
case MaxDeliverNum of
|
||||||
0 ->
|
0 ->
|
||||||
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
|
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
|
||||||
|
@ -160,18 +160,18 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' :
|
||||||
timestamp = Ts}) ->
|
timestamp = Ts}) ->
|
||||||
Ts + Interval * 1000;
|
Ts + Interval * 1000;
|
||||||
get_expiry_time(#message{timestamp = Ts}) ->
|
get_expiry_time(#message{timestamp = Ts}) ->
|
||||||
Interval = emqx_conf:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
||||||
case Interval of
|
case Interval of
|
||||||
0 -> 0;
|
0 -> 0;
|
||||||
_ -> Ts + Interval
|
_ -> Ts + Interval
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_stop_publish_clear_msg() ->
|
get_stop_publish_clear_msg() ->
|
||||||
emqx_conf:get([?APP, stop_publish_clear_msg], false).
|
emqx_conf:get([retainer, stop_publish_clear_msg], false).
|
||||||
|
|
||||||
-spec update_config(hocon:config()) -> {ok, _} | {error, _}.
|
-spec update_config(hocon:config()) -> {ok, _} | {error, _}.
|
||||||
update_config(Conf) ->
|
update_config(Conf) ->
|
||||||
emqx_conf:update([emqx_retainer], Conf, #{override_to => cluster}).
|
emqx_conf:update([retainer], Conf, #{override_to => cluster}).
|
||||||
|
|
||||||
clean() ->
|
clean() ->
|
||||||
call(?FUNCTION_NAME).
|
call(?FUNCTION_NAME).
|
||||||
|
@ -196,10 +196,10 @@ stats_fun() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
emqx_conf:add_handler([emqx_retainer], ?MODULE),
|
emqx_conf:add_handler([retainer], ?MODULE),
|
||||||
init_shared_context(),
|
init_shared_context(),
|
||||||
State = new_state(),
|
State = new_state(),
|
||||||
#{enable := Enable} = Cfg = emqx:get_config([?APP]),
|
#{enable := Enable} = Cfg = emqx:get_config([retainer]),
|
||||||
{ok,
|
{ok,
|
||||||
case Enable of
|
case Enable of
|
||||||
true ->
|
true ->
|
||||||
|
@ -245,7 +245,7 @@ handle_cast(Msg, State) ->
|
||||||
handle_info(clear_expired, #{context := Context} = State) ->
|
handle_info(clear_expired, #{context := Context} = State) ->
|
||||||
Mod = get_backend_module(),
|
Mod = get_backend_module(),
|
||||||
Mod:clear_expired(Context),
|
Mod:clear_expired(Context),
|
||||||
Interval = emqx_conf:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
|
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
|
||||||
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
|
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
|
||||||
|
|
||||||
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
|
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
|
||||||
|
@ -261,7 +261,7 @@ handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} =
|
||||||
end,
|
end,
|
||||||
Waits2)
|
Waits2)
|
||||||
end,
|
end,
|
||||||
Interval = emqx:get_config([?APP, flow_control, quota_release_interval]),
|
Interval = emqx:get_config([retainer, flow_control, quota_release_interval]),
|
||||||
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
|
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
|
||||||
wait_quotas := []}};
|
wait_quotas := []}};
|
||||||
|
|
||||||
|
@ -294,7 +294,7 @@ new_context(Id) ->
|
||||||
#{context_id => Id}.
|
#{context_id => Id}.
|
||||||
|
|
||||||
is_too_big(Size) ->
|
is_too_big(Size) ->
|
||||||
Limit = emqx_conf:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
|
Limit = emqx_conf:get([retainer, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
|
||||||
Limit > 0 andalso (Size > Limit).
|
Limit > 0 andalso (Size > Limit).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -368,7 +368,7 @@ insert_shared_context(Key, Term) ->
|
||||||
|
|
||||||
-spec get_msg_deliver_quota() -> non_neg_integer().
|
-spec get_msg_deliver_quota() -> non_neg_integer().
|
||||||
get_msg_deliver_quota() ->
|
get_msg_deliver_quota() ->
|
||||||
emqx:get_config([?APP, flow_control, msg_deliver_quota]).
|
emqx:get_config([retainer, flow_control, msg_deliver_quota]).
|
||||||
|
|
||||||
-spec update_config(state(), hocons:config(), hocons:config()) -> state().
|
-spec update_config(state(), hocons:config(), hocons:config()) -> state().
|
||||||
update_config(State, Conf, OldConf) ->
|
update_config(State, Conf, OldConf) ->
|
||||||
|
@ -461,7 +461,7 @@ check_timer(Timer, _, _) ->
|
||||||
|
|
||||||
-spec get_backend_module() -> backend().
|
-spec get_backend_module() -> backend().
|
||||||
get_backend_module() ->
|
get_backend_module() ->
|
||||||
#{type := Backend} = emqx:get_config([?APP, config]),
|
#{type := Backend} = emqx:get_config([retainer, config]),
|
||||||
ModName = if Backend =:= built_in_database ->
|
ModName = if Backend =:= built_in_database ->
|
||||||
mnesia;
|
mnesia;
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -41,7 +41,7 @@ api_spec() ->
|
||||||
{[lookup_retained_api(), with_topic_api(), config_api()], []}.
|
{[lookup_retained_api(), with_topic_api(), config_api()], []}.
|
||||||
|
|
||||||
conf_schema() ->
|
conf_schema() ->
|
||||||
gen_schema(emqx:get_raw_config([emqx_retainer])).
|
gen_schema(emqx:get_raw_config([retainer])).
|
||||||
|
|
||||||
message_props() ->
|
message_props() ->
|
||||||
properties([
|
properties([
|
||||||
|
@ -126,12 +126,12 @@ with_topic_warp(Type, Params) ->
|
||||||
check_backend(Type, Params, fun with_topic/2).
|
check_backend(Type, Params, fun with_topic/2).
|
||||||
|
|
||||||
config(get, _) ->
|
config(get, _) ->
|
||||||
{200, emqx:get_raw_config([emqx_retainer])};
|
{200, emqx:get_raw_config([retainer])};
|
||||||
|
|
||||||
config(put, #{body := Body}) ->
|
config(put, #{body := Body}) ->
|
||||||
try
|
try
|
||||||
{ok, _} = emqx_retainer:update_config(Body),
|
{ok, _} = emqx_retainer:update_config(Body),
|
||||||
{200, emqx:get_raw_config([emqx_retainer])}
|
{200, emqx:get_raw_config([retainer])}
|
||||||
catch _:Reason:_ ->
|
catch _:Reason:_ ->
|
||||||
{400,
|
{400,
|
||||||
#{code => 'UPDATE_FAILED',
|
#{code => 'UPDATE_FAILED',
|
||||||
|
@ -188,7 +188,7 @@ to_bin_string(Data) ->
|
||||||
list_to_binary(io_lib:format("~p", [Data])).
|
list_to_binary(io_lib:format("~p", [Data])).
|
||||||
|
|
||||||
check_backend(Type, Params, Cont) ->
|
check_backend(Type, Params, Cont) ->
|
||||||
case emqx:get_config([emqx_retainer, config, type]) of
|
case emqx:get_config([retainer, config, type]) of
|
||||||
built_in_database ->
|
built_in_database ->
|
||||||
Cont(Type, Params);
|
Cont(Type, Params);
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -140,7 +140,7 @@ page_read(_, Topic, Page, Limit) ->
|
||||||
{ok, Rows}.
|
{ok, Rows}.
|
||||||
|
|
||||||
match_messages(_, Topic, Cursor) ->
|
match_messages(_, Topic, Cursor) ->
|
||||||
MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]),
|
MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]),
|
||||||
case Cursor of
|
case Cursor of
|
||||||
undefined ->
|
undefined ->
|
||||||
case MaxReadNum of
|
case MaxReadNum of
|
||||||
|
@ -249,7 +249,7 @@ make_cursor(Topic) ->
|
||||||
|
|
||||||
-spec is_table_full() -> boolean().
|
-spec is_table_full() -> boolean().
|
||||||
is_table_full() ->
|
is_table_full() ->
|
||||||
#{max_retained_messages := Limit} = emqx:get_config([?APP, config]),
|
#{max_retained_messages := Limit} = emqx:get_config([retainer, config]),
|
||||||
Limit > 0 andalso (table_size() >= Limit).
|
Limit > 0 andalso (table_size() >= Limit).
|
||||||
|
|
||||||
-spec table_size() -> non_neg_integer().
|
-spec table_size() -> non_neg_integer().
|
||||||
|
|
|
@ -6,9 +6,9 @@
|
||||||
|
|
||||||
-define(TYPE(Type), hoconsc:mk(Type)).
|
-define(TYPE(Type), hoconsc:mk(Type)).
|
||||||
|
|
||||||
roots() -> ["emqx_retainer"].
|
roots() -> ["retainer"].
|
||||||
|
|
||||||
fields("emqx_retainer") ->
|
fields("retainer") ->
|
||||||
[ {enable, sc(boolean(), false)}
|
[ {enable, sc(boolean(), false)}
|
||||||
, {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")}
|
, {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")}
|
||||||
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
|
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
-define(BASE_CONF, <<"""
|
-define(BASE_CONF, <<"""
|
||||||
emqx_retainer {
|
retainer {
|
||||||
enable = true
|
enable = true
|
||||||
msg_clear_interval = 0s
|
msg_clear_interval = 0s
|
||||||
msg_expiry_interval = 0s
|
msg_expiry_interval = 0s
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(BASE_CONF, <<"""
|
-define(BASE_CONF, <<"""
|
||||||
emqx_retainer {
|
retainer {
|
||||||
enable = true
|
enable = true
|
||||||
msg_clear_interval = 0s
|
msg_clear_interval = 0s
|
||||||
msg_expiry_interval = 0s
|
msg_expiry_interval = 0s
|
||||||
|
|
Loading…
Reference in New Issue