From 684c562cc7ea814f9b66b48175f9e94ab7914081 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 16 Apr 2015 11:07:10 +0800 Subject: [PATCH] emqttd_utils --- apps/emqttd/src/emqttd_mnesia.erl | 14 ++---- apps/emqttd/src/emqttd_msg_store.erl | 56 ++++++++++++++---------- apps/emqttd/src/emqttd_pubsub.erl | 6 ++- apps/emqttd/src/emqttd_router.erl | 6 +-- apps/emqttd/src/emqttd_session.erl | 2 +- apps/emqttd/src/emqttd_utils.erl | 64 ++++++++++++++++++++++++++++ 6 files changed, 108 insertions(+), 40 deletions(-) create mode 100644 apps/emqttd/src/emqttd_utils.erl diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index fbd74d74b..cdaaa5b68 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -86,12 +86,7 @@ init_tables() -> %% @end %%------------------------------------------------------------------------------ create_tables() -> - %% trie tree tables - %%TODO: should use module 'boot_mnesia' attribute... - ok = emqttd_trie:mnesia(create), - ok = emqttd_pubsub:mnesia(create), - %% TODO: retained messages, this table should not be copied... - ok = emqttd_retained:mnesia(create). + emqttd_utils:apply_module_attributes(boot_mnesia). create_table(Table, Attrs) -> case mnesia:create_table(Table, Attrs) of @@ -108,9 +103,7 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - ok = emqttd_trie:mnesia(replicate), - ok = emqttd_pubsub:mnesia(replicate), - ok = emqttd_retained:mnesia(replicate). + emqttd_utils:apply_module_attributes(copy_mnesia). copy_table(Table) -> case mnesia:add_table_copy(Table, node(), ram_copies) of @@ -137,7 +130,7 @@ wait_for_tables() -> %% Simple cluster with another nodes. %% %% @end -%%-------------- +%%------------------------------------------------------------------------------ cluster(Node) -> %% stop mnesia mnesia:stop(), @@ -174,4 +167,3 @@ wait_for_mnesia(stop) -> {error, mnesia_unexpectedly_starting} end. - diff --git a/apps/emqttd/src/emqttd_msg_store.erl b/apps/emqttd/src/emqttd_msg_store.erl index a42f02170..4b9aa191b 100644 --- a/apps/emqttd/src/emqttd_msg_store.erl +++ b/apps/emqttd/src/emqttd_msg_store.erl @@ -37,7 +37,11 @@ -copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([retain/1, read/2, delete/1]). +-export([retain/1, redeliver/2]). + +%%%============================================================================= +%%% Mnesia callbacks +%%%============================================================================= mnesia(boot) -> ok = emqttd_mnesia:create_table(message, [ @@ -49,7 +53,16 @@ mnesia(boot) -> mnesia(copy) -> ok = emqttd_mnesia:copy_table(message). -%% @doc retain message. +%%%============================================================================= +%%% API +%%%============================================================================= + +%%%----------------------------------------------------------------------------- +%% @doc +%% Retain message. +%% +%% @end +%%%----------------------------------------------------------------------------- -spec retain(mqtt_message()) -> ok | ignore. retain(#mqtt_message{retain = false}) -> ignore; @@ -63,7 +76,7 @@ retain(Msg = #mqtt_message{topic = Topic, TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> - lager:debug("Retained: store message: ~p", [Msg]), + lager:debug("Retained ~s", [emqtt_message:format(Msg)]), mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]), emqttd_metrics:set('messages/retained/count', mnesia:table_info(message, size)); @@ -88,25 +101,23 @@ env() -> end. %% @doc redeliver retained messages to subscribed client. --spec redeliver(Topics, CPid) -> any() when - Topics :: list(binary()), - CPid :: pid(). -redeliver(Topics, CPid) when is_pid(CPid) -> - lists:foreach(fun(Topic) -> - case emqtt_topic:wildcard(Topic) of - false -> - dispatch(CPid, mnesia:dirty_read(message, Topic)); - true -> - Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> - case emqtt_topic:match(Name, Topic) of - true -> [Msg|Acc]; - false -> Acc - end - end, - Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]), - dispatch(CPid, lists:reverse(Msgs)) - end - end, Topics). +-spec redeliver(Topic, CPid) -> any() when + Topic :: binary(), + CPid :: pid(). +redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> + case emqtt_topic:wildcard(Topic) of + false -> + dispatch(CPid, mnesia:dirty_read(message, Topic)); + true -> + Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> + case emqtt_topic:match(Name, Topic) of + true -> [Msg|Acc]; + false -> Acc + end + end, + Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]), + dispatch(CPid, lists:reverse(Msgs)) + end. dispatch(_CPid, []) -> ignore; @@ -115,3 +126,4 @@ dispatch(CPid, Msgs) when is_list(Msgs) -> dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> CPid ! {dispatch, {self(), Msg}}. + diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index c1177a6ee..4d6586380 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -139,8 +139,10 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> end end, case mnesia:transaction(F) of - {atomic, ok} -> {ok, Qos}; - {aborted, Reason} -> {error, Reason} + {atomic, ok} -> + {ok, Qos}; + {aborted, Reason} -> + {error, Reason} end. %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index cdadae663..6999abf21 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -64,10 +64,8 @@ start_link() -> -spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok. route(From, Msg) -> lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]), - % TODO: retained message should be stored in emqttd_pubsub... - % emqttd_retained:retain(Msg), - % unset flag and pubsub - emqttd_pubsub:publish(Msg). + emqttd_msg_store:retain(Msg), + emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)). %%%============================================================================= %%% gen_server callbacks diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 35c00b532..2f3f78a2a 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -187,7 +187,7 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... - emqttd_retained:redeliver([Name || {Name, _} <- Topics], self()), + emqttd_msg_store:redeliver([Name || {Name, _} <- Topics], self()), {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) -> diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl new file mode 100644 index 000000000..916c19fd8 --- /dev/null +++ b/apps/emqttd/src/emqttd_utils.erl @@ -0,0 +1,64 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd utility functions. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_utils). + +-export([apply_module_attributes/1, + all_module_attributes/1]). + +%% only {F, Args}... +apply_module_attributes(Name) -> + [{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} || + {_App, Module, Attrs} <- all_module_attributes(Name)]. + +%% copy from rabbit_misc.erl +all_module_attributes(Name) -> + Targets = + lists:usort( + lists:append( + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun ({App, Module}, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{App, Module, Atts} | Acc] + end + end, [], Targets). + +%% copy from rabbit_misc.erl +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. +