improve the design of retainer
This commit is contained in:
parent
2b45dbf09f
commit
f2f5f251d7
|
@ -1,95 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqttd_backend).
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
|
||||||
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
|
||||||
|
|
||||||
%% Mnesia Callbacks
|
|
||||||
-export([mnesia/1]).
|
|
||||||
|
|
||||||
-boot_mnesia({mnesia, [boot]}).
|
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
|
||||||
|
|
||||||
%% Retained Message API
|
|
||||||
-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1,
|
|
||||||
expire_messages/1, retained_count/0]).
|
|
||||||
|
|
||||||
-record(retained_message, {topic, msg}).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Mnesia callbacks
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
mnesia(boot) ->
|
|
||||||
ok = emqttd_mnesia:create_table(retained_message, [
|
|
||||||
{type, ordered_set},
|
|
||||||
{disc_copies, [node()]},
|
|
||||||
{record_name, retained_message},
|
|
||||||
{attributes, record_info(fields, retained_message)},
|
|
||||||
{storage_properties, [{ets, [compressed]},
|
|
||||||
{dets, [{auto_save, 1000}]}]}]);
|
|
||||||
|
|
||||||
mnesia(copy) ->
|
|
||||||
ok = emqttd_mnesia:copy_table(retained_message).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Retained Message
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec(retain_message(mqtt_message()) -> ok).
|
|
||||||
retain_message(Msg = #mqtt_message{topic = Topic}) ->
|
|
||||||
mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
|
|
||||||
|
|
||||||
-spec(read_messages(binary()) -> [mqtt_message()]).
|
|
||||||
read_messages(Topic) ->
|
|
||||||
[Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
|
|
||||||
|
|
||||||
-spec(match_messages(binary()) -> [mqtt_message()]).
|
|
||||||
match_messages(Filter) ->
|
|
||||||
%% TODO: optimize later...
|
|
||||||
Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
|
|
||||||
case emqttd_topic:match(Name, Filter) of
|
|
||||||
true -> [Msg|Acc];
|
|
||||||
false -> Acc
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
|
|
||||||
|
|
||||||
-spec(delete_message(binary()) -> ok).
|
|
||||||
delete_message(Topic) ->
|
|
||||||
mnesia:dirty_delete(retained_message, Topic).
|
|
||||||
|
|
||||||
-spec(expire_messages(pos_integer()) -> any()).
|
|
||||||
expire_messages(Time) when is_integer(Time) ->
|
|
||||||
mnesia:transaction(
|
|
||||||
fun() ->
|
|
||||||
Match = ets:fun2ms(
|
|
||||||
fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
|
|
||||||
when Time > (MegaSecs * 1000000 + Secs) -> Topic
|
|
||||||
end),
|
|
||||||
Topics = mnesia:select(retained_message, Match, write),
|
|
||||||
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
|
|
||||||
(Topic) -> mnesia:delete({retained_message, Topic})
|
|
||||||
end, Topics)
|
|
||||||
end).
|
|
||||||
|
|
||||||
-spec(retained_count() -> non_neg_integer()).
|
|
||||||
retained_count() ->
|
|
||||||
mnesia:table_info(retained_message, size).
|
|
||||||
|
|
|
@ -23,6 +23,14 @@
|
||||||
|
|
||||||
-include("emqttd_internal.hrl").
|
-include("emqttd_internal.hrl").
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
|
%% Mnesia Callbacks
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([retain/1, dispatch/2]).
|
-export([retain/1, dispatch/2]).
|
||||||
|
|
||||||
|
@ -33,8 +41,26 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(retained_message, {topic, msg}).
|
||||||
|
|
||||||
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Mnesia callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
mnesia(boot) ->
|
||||||
|
ok = emqttd_mnesia:create_table(retained_message, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{disc_copies, [node()]},
|
||||||
|
{record_name, retained_message},
|
||||||
|
{attributes, record_info(fields, retained_message)},
|
||||||
|
{storage_properties, [{ets, [compressed]},
|
||||||
|
{dets, [{auto_save, 1000}]}]}]);
|
||||||
|
|
||||||
|
mnesia(copy) ->
|
||||||
|
ok = emqttd_mnesia:copy_table(retained_message).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -50,14 +76,14 @@ retain(#mqtt_message{retain = false}) -> ignore;
|
||||||
|
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||||
emqttd_backend:delete_message(Topic);
|
delete_message(Topic);
|
||||||
|
|
||||||
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
|
retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
|
||||||
TabSize = emqttd_backend:retained_count(),
|
TabSize = retained_count(),
|
||||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
emqttd_backend:retain_message(Msg),
|
retain_message(Msg),
|
||||||
emqttd_metrics:set('messages/retained', emqttd_backend:retained_count());
|
emqttd_metrics:set('messages/retained', retained_count());
|
||||||
{false, _}->
|
{false, _}->
|
||||||
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
||||||
{_, false}->
|
{_, false}->
|
||||||
|
@ -82,8 +108,8 @@ env(Key) ->
|
||||||
-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()).
|
-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()).
|
||||||
dispatch(Topic, CPid) when is_binary(Topic) ->
|
dispatch(Topic, CPid) when is_binary(Topic) ->
|
||||||
Msgs = case emqttd_topic:wildcard(Topic) of
|
Msgs = case emqttd_topic:wildcard(Topic) of
|
||||||
false -> emqttd_backend:read_messages(Topic);
|
false -> read_messages(Topic);
|
||||||
true -> emqttd_backend:match_messages(Topic)
|
true -> match_messages(Topic)
|
||||||
end,
|
end,
|
||||||
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
|
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
|
||||||
|
|
||||||
|
@ -113,7 +139,7 @@ handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
||||||
StatsFun(emqttd_backend:retained_count()),
|
StatsFun(retained_count()),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(expire, State = #state{expired_after = Never})
|
handle_info(expire, State = #state{expired_after = Never})
|
||||||
|
@ -121,7 +147,7 @@ handle_info(expire, State = #state{expired_after = Never})
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
||||||
emqttd_backend:expire_messages(emqttd_time:now_to_secs() - ExpiredAfter),
|
expire_messages(emqttd_time:now_to_secs() - ExpiredAfter),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -134,3 +160,47 @@ terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal Functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(retain_message(mqtt_message()) -> ok).
|
||||||
|
retain_message(Msg = #mqtt_message{topic = Topic}) ->
|
||||||
|
mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
|
||||||
|
|
||||||
|
-spec(read_messages(binary()) -> [mqtt_message()]).
|
||||||
|
read_messages(Topic) ->
|
||||||
|
[Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
|
||||||
|
|
||||||
|
-spec(match_messages(binary()) -> [mqtt_message()]).
|
||||||
|
match_messages(Filter) ->
|
||||||
|
%% TODO: optimize later...
|
||||||
|
Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
|
||||||
|
case emqttd_topic:match(Name, Filter) of
|
||||||
|
true -> [Msg|Acc];
|
||||||
|
false -> Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
|
||||||
|
|
||||||
|
-spec(delete_message(binary()) -> ok).
|
||||||
|
delete_message(Topic) ->
|
||||||
|
mnesia:dirty_delete(retained_message, Topic).
|
||||||
|
|
||||||
|
-spec(expire_messages(pos_integer()) -> any()).
|
||||||
|
expire_messages(Time) when is_integer(Time) ->
|
||||||
|
mnesia:transaction(
|
||||||
|
fun() ->
|
||||||
|
Match = ets:fun2ms(
|
||||||
|
fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
|
||||||
|
when Time > (MegaSecs * 1000000 + Secs) -> Topic
|
||||||
|
end),
|
||||||
|
Topics = mnesia:select(retained_message, Match, write),
|
||||||
|
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
|
||||||
|
(Topic) -> mnesia:delete({retained_message, Topic})
|
||||||
|
end, Topics)
|
||||||
|
end).
|
||||||
|
|
||||||
|
-spec(retained_count() -> non_neg_integer()).
|
||||||
|
retained_count() -> mnesia:table_info(retained_message, size).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue