Merge pull request #329 from emqtt/dev-feng
fix issue #182 - retained message expiration
This commit is contained in:
commit
a2bbceff3c
|
@ -132,8 +132,12 @@
|
||||||
|
|
||||||
%% Retained messages
|
%% Retained messages
|
||||||
{retained, [
|
{retained, [
|
||||||
|
%% Expired after seconds, never expired if 0
|
||||||
|
{expired_after, 0},
|
||||||
|
|
||||||
%% Max number of retained messages
|
%% Max number of retained messages
|
||||||
{max_message_num, 100000},
|
{max_message_num, 100000},
|
||||||
|
|
||||||
%% Max Payload Size of retained message
|
%% Max Payload Size of retained message
|
||||||
{max_playload_size, 65536}
|
{max_playload_size, 65536}
|
||||||
]},
|
]},
|
||||||
|
|
|
@ -124,8 +124,12 @@
|
||||||
|
|
||||||
%% Retained messages
|
%% Retained messages
|
||||||
{retained, [
|
{retained, [
|
||||||
|
%% Expired after seconds, never expired if 0
|
||||||
|
{expired_after, 0},
|
||||||
|
|
||||||
%% Max number of retained messages
|
%% Max number of retained messages
|
||||||
{max_message_num, 100000},
|
{max_message_num, 100000},
|
||||||
|
|
||||||
%% Max Payload Size of retained message
|
%% Max Payload Size of retained message
|
||||||
{max_playload_size, 65536}
|
{max_playload_size, 65536}
|
||||||
]},
|
]},
|
||||||
|
|
|
@ -75,6 +75,7 @@ start_listeners() ->
|
||||||
start_servers(Sup) ->
|
start_servers(Sup) ->
|
||||||
Servers = [{"emqttd ctl", emqttd_ctl},
|
Servers = [{"emqttd ctl", emqttd_ctl},
|
||||||
{"emqttd trace", emqttd_trace},
|
{"emqttd trace", emqttd_trace},
|
||||||
|
{"emqttd retained", emqttd_retained},
|
||||||
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
||||||
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
||||||
{"emqttd session manager", {supervisor, emqttd_sm_sup}},
|
{"emqttd session manager", {supervisor, emqttd_sm_sup}},
|
||||||
|
|
|
@ -31,6 +31,8 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
%% Mnesia callbacks
|
%% Mnesia callbacks
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
@ -40,8 +42,19 @@
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([retain/1, dispatch/2]).
|
-export([retain/1, dispatch/2]).
|
||||||
|
|
||||||
|
%% API Function Exports
|
||||||
|
-export([start_link/0, expire/1]).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% gen_server Function Exports
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(mqtt_retained, {topic, message}).
|
-record(mqtt_retained, {topic, message}).
|
||||||
|
|
||||||
|
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Mnesia callbacks
|
%%% Mnesia callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
@ -60,6 +73,14 @@ mnesia(copy) ->
|
||||||
%%% API
|
%%% API
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Start a retained server
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc Retain message
|
%% @doc Retain message
|
||||||
%% @end
|
%% @end
|
||||||
|
@ -71,9 +92,7 @@ retain(#mqtt_message{retain = false}) -> ignore;
|
||||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||||
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
|
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
|
||||||
|
|
||||||
retain(Msg = #mqtt_message{topic = Topic,
|
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
|
||||||
retain = true,
|
|
||||||
payload = Payload}) ->
|
|
||||||
TabSize = mnesia:table_info(retained, size),
|
TabSize = mnesia:table_info(retained, size),
|
||||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
|
@ -82,23 +101,23 @@ retain(Msg = #mqtt_message{topic = Topic,
|
||||||
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
|
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
|
||||||
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
|
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
|
||||||
{false, _}->
|
{false, _}->
|
||||||
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
|
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
||||||
{_, false}->
|
{_, false}->
|
||||||
lager:error("Dropped retained message(topic=~s, payload_size=~p) for payload is too big!", [Topic, size(Payload)])
|
lager:error("Cannot retain message(topic=~s, payload_size=~p)"
|
||||||
|
" for payload is too big!", [Topic, size(Payload)])
|
||||||
end, ok.
|
end, ok.
|
||||||
|
|
||||||
limit(table) ->
|
limit(table) -> env(max_message_num);
|
||||||
proplists:get_value(max_message_num, env());
|
limit(payload) -> env(max_playload_size).
|
||||||
limit(payload) ->
|
|
||||||
proplists:get_value(max_playload_size, env()).
|
|
||||||
|
|
||||||
env() ->
|
env(Key) ->
|
||||||
case get({env, retained}) of
|
case get({retained, Key}) of
|
||||||
undefined ->
|
undefined ->
|
||||||
Env = emqttd_broker:env(retained),
|
Env = emqttd_broker:env(retained),
|
||||||
put({env, retained}, Env), Env;
|
Val = proplists:get_value(Key, Env),
|
||||||
Env ->
|
put({retained, Key}, Val), Val;
|
||||||
Env
|
Val ->
|
||||||
|
Val
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
@ -124,3 +143,55 @@ dispatch(Topic, CPid) when is_binary(Topic) ->
|
||||||
end,
|
end,
|
||||||
lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)).
|
lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)).
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
|
||||||
|
{ok, #state{stats_fun = StatsFun,
|
||||||
|
expired_after = env(expired_after),
|
||||||
|
stats_timer = timer:send_interval(1000, stats),
|
||||||
|
expire_timer = timer:send_interval(300 * 1000, expire)}}.
|
||||||
|
|
||||||
|
handle_call(_Request, _From, State) ->
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
lager:critical("Unexpected Msg: ~p", [Msg]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
||||||
|
StatsFun(mnesia:table_info(retained, size)),
|
||||||
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
|
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
||||||
|
expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter),
|
||||||
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
lager:critical("Unexpected Info: ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) ->
|
||||||
|
timer:cancel(TRef1),
|
||||||
|
timer:cancel(TRef2).
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
expire(Time) ->
|
||||||
|
mnesia:async_dirty(
|
||||||
|
fun() ->
|
||||||
|
Match = ets:fun2ms(
|
||||||
|
fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
|
||||||
|
when Time > (MegaSecs * 1000000 + Secs) -> Topic
|
||||||
|
end),
|
||||||
|
Topics = mnesia:select(retained, Match, write),
|
||||||
|
lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics)
|
||||||
|
end).
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link(?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
||||||
|
|
|
@ -72,6 +72,11 @@
|
||||||
'queues/max' % ...
|
'queues/max' % ...
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% $SYS Topic for retained
|
||||||
|
-define(SYSTOP_RETAINED, [
|
||||||
|
'retained/count',
|
||||||
|
'retained/max'
|
||||||
|
]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -139,7 +144,7 @@ setstats(Stat, MaxStat, Val) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
|
||||||
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics],
|
[ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics],
|
||||||
|
|
Loading…
Reference in New Issue