From e32d85f40a3b3cc09e81cdcc0826e509a6655f3c Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 12 Mar 2016 15:39:19 +0800 Subject: [PATCH] backend and retained_message --- src/emqttd_backend.erl | 64 +++++++++++++++++++++++++- src/emqttd_retainer.erl | 85 +++++++---------------------------- test/emqttd_SUITE.erl | 67 ++++++++++++++++++++------- test/emqttd_backend_SUITE.erl | 29 +++++++----- 4 files changed, 150 insertions(+), 95 deletions(-) diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl index 16079bbf3..df75866cc 100644 --- a/src/emqttd_backend.erl +++ b/src/emqttd_backend.erl @@ -18,13 +18,19 @@ -include("emqttd.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% API. +%% Retained Message API +-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1, + expire_messages/1, retained_count/0]). + +%% Static Subscription API -export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1, del_subscription/2]). @@ -33,6 +39,14 @@ %%-------------------------------------------------------------------- mnesia(boot) -> + ok = emqttd_mnesia:create_table(retained_message, [ + {type, ordered_set}, + {disc_copies, [node()]}, + {record_name, mqtt_message}, + {index, [#mqtt_message.topic]}, + {attributes, record_info(fields, mqtt_message)}, + {storage_properties, [{ets, [compressed]}, + {dets, [{auto_save, 1000}]}]}]), ok = emqttd_mnesia:create_table(backend_subscription, [ {type, bag}, {disc_copies, [node()]}, @@ -42,14 +56,59 @@ mnesia(boot) -> {dets, [{auto_save, 5000}]}]}]); mnesia(copy) -> + ok = emqttd_mnesia:copy_table(retained_message), ok = emqttd_mnesia:copy_table(backend_subscription). +%%-------------------------------------------------------------------- +%% Retained Message +%%-------------------------------------------------------------------- + +-spec(retain_message(mqtt_message()) -> ok). +retain_message(Msg) when is_record(Msg, mqtt_message) -> + mnesia:dirty_write(retained_message, Msg). + +-spec(read_messages(binary()) -> [mqtt_message()]). +read_messages(Topic) -> + mnesia:dirty_index_read(retained_message, Topic, #mqtt_message.topic). + +-spec(match_messages(binary()) -> [mqtt_message()]). +match_messages(Filter) -> + %% TODO: optimize later... + Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> + case emqttd_topic:match(Name, Filter) of + true -> [Msg|Acc]; + false -> Acc + end + end, + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). + +-spec(delete_message(binary()) -> ok). +delete_message(Topic) -> + %%TODO: no transaction??? + [mnesia:dirty_delete_object(retained_message, Msg) || Msg <- read_messages(Topic)]. + +-spec(expire_messages(pos_integer()) -> any()). +expire_messages(Time) when is_integer(Time) -> + mnesia:transaction( + fun() -> + Match = ets:fun2ms( + fun(#mqtt_message{msgid = MsgId, timestamp = {MegaSecs, Secs, _}}) + when Time > (MegaSecs * 1000000 + Secs) -> MsgId + end), + MsgIds = mnesia:select(retained_message, Match, write), + lists:foreach(fun(MsgId) -> mnesia:delete({retained_message, MsgId}) end, MsgIds) + end). + +-spec(retained_count() -> non_neg_integer()). +retained_count() -> + mnesia:table_info(retained_message, size). + %%-------------------------------------------------------------------- %% Static Subscriptions %%-------------------------------------------------------------------- %% @doc Add a static subscription manually. --spec add_subscription(mqtt_subscription()) -> ok | {error, already_existed}. +-spec(add_subscription(mqtt_subscription()) -> ok | {error, already_existed}). add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> Pattern = match_pattern(SubId, Topic), return(mnesia:transaction(fun() -> @@ -89,3 +148,4 @@ match_pattern(SubId, Topic) -> return({atomic, ok}) -> ok; return({aborted, Reason}) -> {error, Reason}. + diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index f90223a22..57f586453 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -23,66 +23,41 @@ -include("emqttd_internal.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). - -%% Mnesia callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - %% API Function Exports -export([retain/1, dispatch/2]). %% API Function Exports --export([start_link/0, expire/1]). +-export([start_link/0]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(mqtt_retained, {topic, message}). - -record(state, {stats_fun, expired_after, stats_timer, expire_timer}). -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = emqttd_mnesia:create_table(retained, [ - {type, ordered_set}, - {disc_copies, [node()]}, - {record_name, mqtt_retained}, - {attributes, record_info(fields, mqtt_retained)}]); -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(retained). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Start a retained server --spec start_link() -> {ok, pid()} | ignore | {error, any()}. +%% @doc Start the retainer +-spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% @doc Retain message --spec retain(mqtt_message()) -> ok | ignore. +%% @doc Retain a message +-spec(retain(mqtt_message()) -> ok | ignore). retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]); + emqttd_backend:delete_message(Topic); retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> - TabSize = mnesia:table_info(retained, size), + TabSize = emqttd_backend:retained_count(), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> - Retained = #mqtt_retained{topic = Topic, message = Msg}, - lager:debug("RETAIN ~s", [emqttd_message:format(Msg)]), - mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), - emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); + emqttd_backend:retain_message(Msg), + emqttd_metrics:set('messages/retained', emqttd_backend:retained_count()); {false, _}-> lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); {_, false}-> @@ -103,24 +78,13 @@ env(Key) -> Val end. -%% @doc Deliver retained messages to subscribed client --spec dispatch(Topic, CPid) -> any() when - Topic :: binary(), - CPid :: pid(). +%% @doc Deliver retained messages to the subscriber +-spec(dispatch(Topic :: binary(), CPid :: pid()) -> any()). dispatch(Topic, CPid) when is_binary(Topic) -> - Msgs = - case emqttd_topic:wildcard(Topic) of - false -> - [Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)]; - true -> - Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) -> - case emqttd_topic:match(Name, Topic) of - true -> [Msg|Acc]; - false -> Acc - end - end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) - end, + Msgs = case emqttd_topic:wildcard(Topic) of + false -> emqttd_backend:read_messages(Topic); + true -> emqttd_backend:match_messages(Topic) + end, lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). %%-------------------------------------------------------------------- @@ -145,7 +109,7 @@ handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info(stats, State = #state{stats_fun = StatsFun}) -> - StatsFun(mnesia:table_info(retained, size)), + StatsFun(emqttd_backend:retained_count()), {noreply, State, hibernate}; handle_info(expire, State = #state{expired_after = Never}) @@ -153,7 +117,7 @@ handle_info(expire, State = #state{expired_after = Never}) {noreply, State, hibernate}; handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> - expire(emqttd_time:now_to_secs() - ExpiredAfter), + emqttd_backend:expire_messages(emqttd_time:now_to_secs() - ExpiredAfter), {noreply, State, hibernate}; handle_info(Info, State) -> @@ -166,18 +130,3 @@ terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) - code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -expire(Time) -> - mnesia:async_dirty( - fun() -> - Match = ets:fun2ms( - fun(#mqtt_retained{topic = Topic, message = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) - when Time > (MegaSecs * 1000000 + Secs) -> Topic - end), - Topics = mnesia:select(retained, Match, write), - lists:foreach(fun(Topic) -> mnesia:delete({retained, Topic}) end, Topics) - end). - diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index bd6913a69..3650a24db 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -29,6 +29,7 @@ all() -> {group, metrics}, {group, stats}, {group, hook}, + {group, backend}, {group, cli}]. groups() -> @@ -44,8 +45,6 @@ groups() -> router_unused]}, {session, [sequence], [start_session]}, - {retainer, [sequence], - [retain_message]}, {broker, [sequence], [hook_unhook]}, {metrics, [sequence], @@ -55,6 +54,12 @@ groups() -> {hook, [sequence], [add_delete_hook, run_hooks]}, + {retainer, [sequence], + [retain_messages, + dispatch_retained_messages, + expire_retained_messages]}, + {backend, [sequence], + [backend_subscription]}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -207,19 +212,6 @@ start_session(_) -> emqttd_session:unsubscribe(SessPid, [<<"topic/session">>]), emqttd_mock_client:stop(ClientPid). -%%-------------------------------------------------------------------- -%% Retainer Group -%%-------------------------------------------------------------------- - -retain_message(_) -> - Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>, - payload = <<"payload">>}, - emqttd_retainer:retain(Msg), - emqttd_retainer:dispatch(<<"a/b/+">>, self()), - true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end, - emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}), - [] = mnesia:dirty_read({retained, <<"a/b/c">>}). - %%-------------------------------------------------------------------- %% Broker Group %%-------------------------------------------------------------------- @@ -280,6 +272,51 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. +%%-------------------------------------------------------------------- +%% Retainer Test +%%-------------------------------------------------------------------- + +retain_messages(_) -> + Msg = emqttd_message:make(<<"clientId">>, <<"topic">>, <<"payload">>), + emqttd_backend:retain_message(Msg), + [Msg] = emqttd_backend:read_messages(<<"topic">>), + [Msg] = emqttd_backend:match_messages(<<"topic/#">>), + emqttd_backend:delete_message(<<"topic">>), + 0 = emqttd_backend:retained_count(). + +dispatch_retained_messages(_) -> + Msg = #mqtt_message{retain = true, topic = <<"a/b/c">>, + payload = <<"payload">>}, + emqttd_retainer:retain(Msg), + emqttd_retainer:dispatch(<<"a/b/+">>, self()), + true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end, + emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}), + [] = emqttd_backend:read_messages(<<"a/b/c">>). + +expire_retained_messages(_) -> + Msg1 = emqttd_message:make(<<"clientId1">>, qos1, <<"topic/1">>, <<"payload1">>), + Msg2 = emqttd_message:make(<<"clientId2">>, qos2, <<"topic/2">>, <<"payload2">>), + emqttd_backend:retain_message(Msg1), + emqttd_backend:retain_message(Msg2), + timer:sleep(2000), + emqttd_backend:expire_messages(emqttd_time:now_to_secs()), + 0 = emqttd_backend:retained_count(). + +%%-------------------------------------------------------------------- +%% Backend Test +%%-------------------------------------------------------------------- + +backend_subscription(_) -> + Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2}, + Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2}, + emqttd_backend:add_subscription(Sub1), + emqttd_backend:add_subscription(Sub2), + [Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), + emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>), + [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), + emqttd_backend:del_subscriptions(<<"clientId">>), + [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). + %%-------------------------------------------------------------------- %% CLI Group %%-------------------------------------------------------------------- diff --git a/test/emqttd_backend_SUITE.erl b/test/emqttd_backend_SUITE.erl index 162510bc7..ceb1b3d6e 100644 --- a/test/emqttd_backend_SUITE.erl +++ b/test/emqttd_backend_SUITE.erl @@ -16,21 +16,30 @@ -module(emqttd_backend_SUITE). +-include("emqttd.hrl"). + -compile(export_all). -all() -> [{group, retainer}]. +all() -> [{group, subscription}]. -groups() -> [{retainer, [], [t_retain]}]. +groups() -> [{subscription, [], [add_del_subscription]}]. -init_per_group(retainer, _Config) -> +init_per_suite(Config) -> ok = emqttd_mnesia:ensure_started(), - emqttd_retainer:mnesia(boot), - emqttd_retainer:mnesia(copy). + emqttd_backend:mnesia(boot), + emqttd_backend:mnesia(copy), + Config. -end_per_group(retainer, _Config) -> - ok; -end_per_group(_Group, _Config) -> - ok. +end_per_suite(_Config) -> + emqttd_mnesia:ensure_stopped(). -t_retain(_) -> ok. +add_del_subscription(_) -> + Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2}, + Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 1}, + ok = emqttd_backend:add_subscription(Sub1), + {error, already_existed} = emqttd_backend:add_subscription(Sub1), + ok = emqttd_backend:add_subscription(Sub2), + [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), + emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>), + [] = emqttd_backend:lookup_subscriptions(<<"clientId">>).