From f2f5f251d77e0d4a6b75c98d493d453940f5a855 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 18 Aug 2016 08:09:43 +0800 Subject: [PATCH] improve the design of retainer --- src/emqttd_backend.erl | 95 ----------------------------------------- src/emqttd_retainer.erl | 86 +++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 103 deletions(-) delete mode 100644 src/emqttd_backend.erl diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl deleted file mode 100644 index 5515b2ac8..000000000 --- a/src/emqttd_backend.erl +++ /dev/null @@ -1,95 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_backend). - --include("emqttd.hrl"). - --include_lib("stdlib/include/ms_transform.hrl"). - -%% Mnesia Callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% Retained Message API --export([retain_message/1, read_messages/1, match_messages/1, delete_message/1, - expire_messages/1, retained_count/0]). - --record(retained_message, {topic, msg}). - -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = emqttd_mnesia:create_table(retained_message, [ - {type, ordered_set}, - {disc_copies, [node()]}, - {record_name, retained_message}, - {attributes, record_info(fields, retained_message)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 1000}]}]}]); - -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(retained_message). - -%%-------------------------------------------------------------------- -%% Retained Message -%%-------------------------------------------------------------------- - --spec(retain_message(mqtt_message()) -> ok). -retain_message(Msg = #mqtt_message{topic = Topic}) -> - mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). - --spec(read_messages(binary()) -> [mqtt_message()]). -read_messages(Topic) -> - [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. - --spec(match_messages(binary()) -> [mqtt_message()]). -match_messages(Filter) -> - %% TODO: optimize later... - Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> - case emqttd_topic:match(Name, Filter) of - true -> [Msg|Acc]; - false -> Acc - end - end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). - --spec(delete_message(binary()) -> ok). -delete_message(Topic) -> - mnesia:dirty_delete(retained_message, Topic). - --spec(expire_messages(pos_integer()) -> any()). -expire_messages(Time) when is_integer(Time) -> - mnesia:transaction( - fun() -> - Match = ets:fun2ms( - fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) - when Time > (MegaSecs * 1000000 + Secs) -> Topic - end), - Topics = mnesia:select(retained_message, Match, write), - lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages - (Topic) -> mnesia:delete({retained_message, Topic}) - end, Topics) - end). - --spec(retained_count() -> non_neg_integer()). -retained_count() -> - mnesia:table_info(retained_message, size). - diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index 0b239bec5..005489e58 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -23,6 +23,14 @@ -include("emqttd_internal.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +%% Mnesia Callbacks +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + %% API Function Exports -export([retain/1, dispatch/2]). @@ -33,8 +41,26 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(retained_message, {topic, msg}). + -record(state, {stats_fun, expired_after, stats_timer, expire_timer}). +%%-------------------------------------------------------------------- +%% Mnesia callbacks +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = emqttd_mnesia:create_table(retained_message, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {record_name, retained_message}, + {attributes, record_info(fields, retained_message)}, + {storage_properties, [{ets, [compressed]}, + {dets, [{auto_save, 1000}]}]}]); + +mnesia(copy) -> + ok = emqttd_mnesia:copy_table(retained_message). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -50,14 +76,14 @@ retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - emqttd_backend:delete_message(Topic); + delete_message(Topic); retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> - TabSize = emqttd_backend:retained_count(), + TabSize = retained_count(), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> - emqttd_backend:retain_message(Msg), - emqttd_metrics:set('messages/retained', emqttd_backend:retained_count()); + retain_message(Msg), + emqttd_metrics:set('messages/retained', retained_count()); {false, _}-> lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); {_, false}-> @@ -82,8 +108,8 @@ env(Key) -> -spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()). dispatch(Topic, CPid) when is_binary(Topic) -> Msgs = case emqttd_topic:wildcard(Topic) of - false -> emqttd_backend:read_messages(Topic); - true -> emqttd_backend:match_messages(Topic) + false -> read_messages(Topic); + true -> match_messages(Topic) end, lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). @@ -113,7 +139,7 @@ handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info(stats, State = #state{stats_fun = StatsFun}) -> - StatsFun(emqttd_backend:retained_count()), + StatsFun(retained_count()), {noreply, State, hibernate}; handle_info(expire, State = #state{expired_after = Never}) @@ -121,7 +147,7 @@ handle_info(expire, State = #state{expired_after = Never}) {noreply, State, hibernate}; handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> - emqttd_backend:expire_messages(emqttd_time:now_to_secs() - ExpiredAfter), + expire_messages(emqttd_time:now_to_secs() - ExpiredAfter), {noreply, State, hibernate}; handle_info(Info, State) -> @@ -134,3 +160,47 @@ terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) - code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- +%% Internal Functions +%%-------------------------------------------------------------------- + +-spec(retain_message(mqtt_message()) -> ok). +retain_message(Msg = #mqtt_message{topic = Topic}) -> + mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). + +-spec(read_messages(binary()) -> [mqtt_message()]). +read_messages(Topic) -> + [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. + +-spec(match_messages(binary()) -> [mqtt_message()]). +match_messages(Filter) -> + %% TODO: optimize later... + Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> + case emqttd_topic:match(Name, Filter) of + true -> [Msg|Acc]; + false -> Acc + end + end, + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). + +-spec(delete_message(binary()) -> ok). +delete_message(Topic) -> + mnesia:dirty_delete(retained_message, Topic). + +-spec(expire_messages(pos_integer()) -> any()). +expire_messages(Time) when is_integer(Time) -> + mnesia:transaction( + fun() -> + Match = ets:fun2ms( + fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) + when Time > (MegaSecs * 1000000 + Secs) -> Topic + end), + Topics = mnesia:select(retained_message, Match, write), + lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages + (Topic) -> mnesia:delete({retained_message, Topic}) + end, Topics) + end). + +-spec(retained_count() -> non_neg_integer()). +retained_count() -> mnesia:table_info(retained_message, size). +