From 5977c4499643a952667254c20f38b99b6b109879 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 10 Oct 2015 00:04:07 +0800 Subject: [PATCH] fix issue #182 - etained message expiration --- rel/files/emqttd.config.development | 4 ++ rel/files/emqttd.config.production | 4 ++ src/emqttd_app.erl | 1 + src/emqttd_retained.erl | 99 +++++++++++++++++++++++++---- src/emqttd_stats.erl | 7 +- 5 files changed, 100 insertions(+), 15 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 2cd50c535..f93d30959 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -132,8 +132,12 @@ %% Retained messages {retained, [ + %% Expired after seconds, never expired if 0 + {expired_after, 0}, + %% Max number of retained messages {max_message_num, 100000}, + %% Max Payload Size of retained message {max_playload_size, 65536} ]}, diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index c44e1354c..46caf33d6 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -124,8 +124,12 @@ %% Retained messages {retained, [ + %% Expired after seconds, never expired if 0 + {expired_after, 0}, + %% Max number of retained messages {max_message_num, 100000}, + %% Max Payload Size of retained message {max_playload_size, 65536} ]}, diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index c5fd94bdf..5f4e51da6 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -75,6 +75,7 @@ start_listeners() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, {"emqttd trace", emqttd_trace}, + {"emqttd retained", emqttd_retained}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index 633618557..29e83d52c 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -31,6 +31,8 @@ -include("emqttd.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + %% Mnesia callbacks -export([mnesia/1]). @@ -40,8 +42,19 @@ %% API Function Exports -export([retain/1, dispatch/2]). +%% API Function Exports +-export([start_link/0, expire/1]). + +-behaviour(gen_server). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + -record(mqtt_retained, {topic, message}). +-record(state, {stats_fun, expired_after, stats_timer, expire_timer}). + %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= @@ -60,6 +73,14 @@ mnesia(copy) -> %%% API %%%============================================================================= +%%------------------------------------------------------------------------------ +%% @doc Start a retained server +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + %%%----------------------------------------------------------------------------- %% @doc Retain message %% @end @@ -71,9 +92,7 @@ retain(#mqtt_message{retain = false}) -> ignore; retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]); -retain(Msg = #mqtt_message{topic = Topic, - retain = true, - payload = Payload}) -> +retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> TabSize = mnesia:table_info(retained, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> @@ -82,23 +101,23 @@ retain(Msg = #mqtt_message{topic = Topic, mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); {false, _}-> - lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); + lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); {_, false}-> - lager:error("Dropped retained message(topic=~s, payload_size=~p) for payload is too big!", [Topic, size(Payload)]) + lager:error("Cannot retain message(topic=~s, payload_size=~p)" + " for payload is too big!", [Topic, size(Payload)]) end, ok. -limit(table) -> - proplists:get_value(max_message_num, env()); -limit(payload) -> - proplists:get_value(max_playload_size, env()). +limit(table) -> env(max_message_num); +limit(payload) -> env(max_playload_size). -env() -> - case get({env, retained}) of +env(Key) -> + case get({retained, Key}) of undefined -> Env = emqttd_broker:env(retained), - put({env, retained}, Env), Env; - Env -> - Env + Val = proplists:get_value(Key, Env), + put({retained, Key}, Val), Val; + Val -> + Val end. %%%----------------------------------------------------------------------------- @@ -124,3 +143,55 @@ dispatch(Topic, CPid) when is_binary(Topic) -> end, lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)). +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +init([]) -> + StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'), + {ok, #state{stats_fun = StatsFun, + expired_after = env(expired_after), + stats_timer = timer:send_interval(1000, stats), + expire_timer = timer:send_interval(300 * 1000, expire)}}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), + {noreply, State}. + +handle_info(stats, State = #state{stats_fun = StatsFun}) -> + StatsFun(mnesia:table_info(retained, size)), + {noreply, State, hibernate}; + +handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> + expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter), + {noreply, State, hibernate}; + +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -> + timer:cancel(TRef1), + timer:cancel(TRef2). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +expire(Time) -> + mnesia:async_dirty( + fun() -> + Match = ets:fun2ms( + fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) + when Time > (MegaSecs * 1000000 + Secs) -> Topic + end), + Topics = mnesia:select(retained, Match, write), + lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics) + end). + diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 345dc2d26..b1f52f06e 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -72,6 +72,11 @@ 'queues/max' % ... ]). +%% $SYS Topic for retained +-define(SYSTOP_RETAINED, [ + 'retained/count', + 'retained/max' +]). %%%============================================================================= %%% API @@ -139,7 +144,7 @@ setstats(Stat, MaxStat, Val) -> init([]) -> random:seed(now()), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), - Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, + Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, [ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics [ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics],