diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index feb18ac2e..c3ffb9f90 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl index 852fc4717..ac5be58b2 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl @@ -22,6 +22,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-logger_header("[Delayed]"). + %% Mnesia bootstrap -export([mnesia/1]). @@ -90,7 +92,11 @@ description() -> %% Hooks %%-------------------------------------------------------------------- -on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>>, timestamp = Ts}) -> +on_message_publish(Msg = #message{ + id = Id, + topic = <<"$delayed/", Topic/binary>>, + timestamp = Ts + }) -> [Delay, Topic1] = binary:split(Topic, <<"/">>), PubAt = case binary_to_integer(Delay) of Interval when Interval < ?MAX_INTERVAL -> @@ -127,42 +133,57 @@ store(DelayedMsg) -> %%-------------------------------------------------------------------- init([]) -> - {ok, ensure_publish_timer(#{timer => undefined, publish_at => 0})}. + {ok, ensure_stats_event( + ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> ok = mnesia:dirty_write(?TAB, DelayedMsg), - emqx_metrics:set('messages.delayed', delayed_count()), + emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; handle_call(Req, _From, State) -> - ?LOG(error, "[Delayed] Unexpected call: ~p", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[Delayed] Unexpected cast: ~p", [Msg]), + ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. %% Do Publish... handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys), - emqx_metrics:set('messages.delayed', delayed_count()), {noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})}; +handle_info(stats, State = #{stats_fun := StatsFun}) -> + StatsFun(delayed_count()), + {noreply, State, hibernate}; + handle_info(Info, State) -> - ?LOG(error, "[Delayed] Unexpected info: ~p", [Info]), + ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #{timer := TRef}) -> emqx_misc:cancel_timer(TRef). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" -> + NState = maps:with([timer, publish_at], State), + {ok, NState}; + +code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" -> + NState = ensure_stats_event(State), + {ok, NState}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +%% Ensure the stats +ensure_stats_event(State) -> + StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), + {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), + State#{stats_fun => StatsFun, stats_timer => StatsTimer}. + %% Ensure publish timer ensure_publish_timer(State) -> ensure_publish_timer(mnesia:dirty_first(?TAB), State). diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index a969fd30d..576316703 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.0"}, + {vsn, "4.3.1"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src new file mode 100644 index 000000000..b44a65c17 --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -0,0 +1,15 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.0", [ + {update, emqx_mod_delayed, {advanced, []}} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {update, emqx_mod_delayed, {advanced, []}} + ]}, + {<<".*">>, []} + ] +}.