fix issue #182 - etained message expiration

This commit is contained in:
Feng 2015-10-10 00:04:07 +08:00
parent 0a5d1f9e3e
commit 5977c44996
5 changed files with 100 additions and 15 deletions

View File

@ -132,8 +132,12 @@
%% Retained messages
{retained, [
%% Expired after seconds, never expired if 0
{expired_after, 0},
%% Max number of retained messages
{max_message_num, 100000},
%% Max Payload Size of retained message
{max_playload_size, 65536}
]},

View File

@ -124,8 +124,12 @@
%% Retained messages
{retained, [
%% Expired after seconds, never expired if 0
{expired_after, 0},
%% Max number of retained messages
{max_message_num, 100000},
%% Max Payload Size of retained message
{max_playload_size, 65536}
]},

View File

@ -75,6 +75,7 @@ start_listeners() ->
start_servers(Sup) ->
Servers = [{"emqttd ctl", emqttd_ctl},
{"emqttd trace", emqttd_trace},
{"emqttd retained", emqttd_retained},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", {supervisor, emqttd_sm_sup}},

View File

@ -31,6 +31,8 @@
-include("emqttd.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia callbacks
-export([mnesia/1]).
@ -40,8 +42,19 @@
%% API Function Exports
-export([retain/1, dispatch/2]).
%% API Function Exports
-export([start_link/0, expire/1]).
-behaviour(gen_server).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(mqtt_retained, {topic, message}).
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
%%%=============================================================================
%%% Mnesia callbacks
%%%=============================================================================
@ -60,6 +73,14 @@ mnesia(copy) ->
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Start a retained server
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%-----------------------------------------------------------------------------
%% @doc Retain message
%% @end
@ -71,9 +92,7 @@ retain(#mqtt_message{retain = false}) -> ignore;
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
retain(Msg = #mqtt_message{topic = Topic,
retain = true,
payload = Payload}) ->
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
TabSize = mnesia:table_info(retained, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
@ -82,23 +101,23 @@ retain(Msg = #mqtt_message{topic = Topic,
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
{false, _}->
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
{_, false}->
lager:error("Dropped retained message(topic=~s, payload_size=~p) for payload is too big!", [Topic, size(Payload)])
lager:error("Cannot retain message(topic=~s, payload_size=~p)"
" for payload is too big!", [Topic, size(Payload)])
end, ok.
limit(table) ->
proplists:get_value(max_message_num, env());
limit(payload) ->
proplists:get_value(max_playload_size, env()).
limit(table) -> env(max_message_num);
limit(payload) -> env(max_playload_size).
env() ->
case get({env, retained}) of
env(Key) ->
case get({retained, Key}) of
undefined ->
Env = emqttd_broker:env(retained),
put({env, retained}, Env), Env;
Env ->
Env
Val = proplists:get_value(Key, Env),
put({retained, Key}, Val), Val;
Val ->
Val
end.
%%%-----------------------------------------------------------------------------
@ -124,3 +143,55 @@ dispatch(Topic, CPid) when is_binary(Topic) ->
end,
lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([]) ->
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
{ok, #state{stats_fun = StatsFun,
expired_after = env(expired_after),
stats_timer = timer:send_interval(1000, stats),
expire_timer = timer:send_interval(300 * 1000, expire)}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsFun(mnesia:table_info(retained, size)),
{noreply, State, hibernate};
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter),
{noreply, State, hibernate};
handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) ->
timer:cancel(TRef1),
timer:cancel(TRef2).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
expire(Time) ->
mnesia:async_dirty(
fun() ->
Match = ets:fun2ms(
fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
when Time > (MegaSecs * 1000000 + Secs) -> Topic
end),
Topics = mnesia:select(retained, Match, write),
lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics)
end).

View File

@ -72,6 +72,11 @@
'queues/max' % ...
]).
%% $SYS Topic for retained
-define(SYSTOP_RETAINED, [
'retained/count',
'retained/max'
]).
%%%=============================================================================
%%% API
@ -139,7 +144,7 @@ setstats(Stat, MaxStat, Val) ->
init([]) ->
random:seed(now()),
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
% Create $SYS Topics
[ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics],