diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index e63453b54..a57cd9e5e 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -43,8 +43,7 @@ ]). %% gen_server callbacks --export([ get_status/0 - , enable/0 +-export([ enable/0 , disable/0 ]). @@ -90,7 +89,11 @@ on_message_publish(Msg = #message{ end, PubMsg = Msg#message{topic = Topic1}, Headers = PubMsg#message.headers, - ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}), + case store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}) of + ok -> ok; + {error, Error} -> + ?LOG(error, "Store delayed message fail: ~p", [Error]) + end, {stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; on_message_publish(Msg) -> @@ -109,9 +112,6 @@ start_link() -> store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). -get_status() -> - gen_server:call(?SERVER, get_status). - enable() -> gen_server:call(?SERVER, enable). @@ -122,17 +122,31 @@ disable() -> %% gen_server callback %%-------------------------------------------------------------------- -init([_Opts]) -> +init([Opts]) -> + MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0), {ok, ensure_stats_event( ensure_publish_timer(#{timer => undefined, publish_at => 0, - enabled => false}))}. + max_delayed_messages => MaxDelayedMessages}))}. -handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> +handle_call({store, DelayedMsg = #delayed_message{key = Key}}, + _From, State = #{max_delayed_messages := 0}) -> ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; +handle_call({store, DelayedMsg = #delayed_message{key = Key}}, + _From, State = #{max_delayed_messages := Val}) -> + Size = mnesia:table_info(?TAB, size), + case Size > Val of + true -> + {reply, {error, max_delayed_messages_full}, State}; + false -> + ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), + emqx_metrics:inc('messages.delayed'), + {reply, ok, ensure_publish_timer(Key, State)} + end; + handle_call(enable, _From, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), {reply, ok, State}; @@ -141,9 +155,6 @@ handle_call(disable, _From, State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), {reply, ok, State}; -handle_call(get_status, _From, State = #{enabled := Enabled}) -> - {reply, Enabled, State}; - handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 50fc6a243..c9a78e736 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -50,9 +50,9 @@ , disable/0 ]). --export([ get_status/0 - , get_uuid/0 +-export([ get_uuid/0 , get_telemetry/0 + , get_status/0 ]). -export([official_version/1]). @@ -73,7 +73,6 @@ -record(state, { uuid :: undefined | binary(), - enabled :: undefined | boolean(), url :: string(), report_interval :: undefined | non_neg_integer(), timer = undefined :: undefined | reference() @@ -121,7 +120,7 @@ disable() -> gen_server:call(?MODULE, disable). get_status() -> - gen_server:call(?MODULE, get_status). + emqx_config:get([telemetry, enable], true). get_uuid() -> gen_server:call(?MODULE, get_uuid). @@ -151,14 +150,13 @@ init(_Opts) -> end, {ok, #state{url = ?TELEMETRY_URL, report_interval = timer:seconds(?REPORT_INTERVAR), - enabled = false, uuid = UUID1}}. handle_call(enable, _From, State) -> case ?MODULE:official_version(emqx_app:get_release()) of true -> report_telemetry(State), - {reply, ok, ensure_report_timer(State#state{enabled = true})}; + {reply, ok, ensure_report_timer(State)}; false -> {reply, {error, not_official_version}, State} end; @@ -167,14 +165,11 @@ handle_call(disable, _From, State = #state{timer = Timer}) -> case ?MODULE:official_version(emqx_app:get_release()) of true -> emqx_misc:cancel_timer(Timer), - {reply, ok, State#state{enabled = false, timer = undefined}}; + {reply, ok, State#state{timer = undefined}}; false -> {reply, {error, not_official_version}, State} end; -handle_call(get_status, _From, State = #state{enabled = Enabled}) -> - {reply, Enabled, State}; - handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> {reply, {ok, UUID}, State}; @@ -193,11 +188,11 @@ handle_continue(Continue, State) -> ?LOG(error, "Unexpected continue: ~p", [Continue]), {noreply, State}. -handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef, - enabled = false}) -> - {noreply, State}; handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) -> - report_telemetry(State), + case get_status() of + true -> report_telemetry(State); + false -> ok + end, {noreply, ensure_report_timer(State)}; handle_info(Info, State) -> diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index c79903f42..a9af83b1d 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -53,7 +53,6 @@ t_load_case(_) -> ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), ok = emqx_delayed:enable(), Hooks1 = emqx_hooks:lookup('message.publish'), - ct:pal("----~p~n", [Hooks1]), ?assertNotEqual(false, lists:keyfind(MFA, 2, Hooks1)), ok. diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 7352c4de5..62c524bdf 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -72,9 +72,7 @@ t_enable(_) -> ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), ok = emqx_telemetry:enable(), - ?assertEqual(true, emqx_telemetry:get_status()), ok = emqx_telemetry:disable(), - ?assertEqual(false, emqx_telemetry:get_status()), meck:unload([emqx_telemetry]). t_send_after_enable(_) ->