emqttd_utils

This commit is contained in:
Feng Lee 2015-04-16 11:07:10 +08:00
parent a899dada12
commit 684c562cc7
6 changed files with 108 additions and 40 deletions

View File

@ -86,12 +86,7 @@ init_tables() ->
%% @end
%%------------------------------------------------------------------------------
create_tables() ->
%% trie tree tables
%%TODO: should use module 'boot_mnesia' attribute...
ok = emqttd_trie:mnesia(create),
ok = emqttd_pubsub:mnesia(create),
%% TODO: retained messages, this table should not be copied...
ok = emqttd_retained:mnesia(create).
emqttd_utils:apply_module_attributes(boot_mnesia).
create_table(Table, Attrs) ->
case mnesia:create_table(Table, Attrs) of
@ -108,9 +103,7 @@ create_table(Table, Attrs) ->
%% @end
%%------------------------------------------------------------------------------
copy_tables() ->
ok = emqttd_trie:mnesia(replicate),
ok = emqttd_pubsub:mnesia(replicate),
ok = emqttd_retained:mnesia(replicate).
emqttd_utils:apply_module_attributes(copy_mnesia).
copy_table(Table) ->
case mnesia:add_table_copy(Table, node(), ram_copies) of
@ -137,7 +130,7 @@ wait_for_tables() ->
%% Simple cluster with another nodes.
%%
%% @end
%%--------------
%%------------------------------------------------------------------------------
cluster(Node) ->
%% stop mnesia
mnesia:stop(),
@ -174,4 +167,3 @@ wait_for_mnesia(stop) ->
{error, mnesia_unexpectedly_starting}
end.

View File

@ -37,7 +37,11 @@
-copy_mnesia({mnesia, [copy]}).
%% API Function Exports
-export([retain/1, read/2, delete/1]).
-export([retain/1, redeliver/2]).
%%%=============================================================================
%%% Mnesia callbacks
%%%=============================================================================
mnesia(boot) ->
ok = emqttd_mnesia:create_table(message, [
@ -49,7 +53,16 @@ mnesia(boot) ->
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(message).
%% @doc retain message.
%%%=============================================================================
%%% API
%%%=============================================================================
%%%-----------------------------------------------------------------------------
%% @doc
%% Retain message.
%%
%% @end
%%%-----------------------------------------------------------------------------
-spec retain(mqtt_message()) -> ok | ignore.
retain(#mqtt_message{retain = false}) -> ignore;
@ -63,7 +76,7 @@ retain(Msg = #mqtt_message{topic = Topic,
TabSize = mnesia:table_info(message, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
lager:debug("Retained: store message: ~p", [Msg]),
lager:debug("Retained ~s", [emqtt_message:format(Msg)]),
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(message, size));
@ -88,25 +101,23 @@ env() ->
end.
%% @doc redeliver retained messages to subscribed client.
-spec redeliver(Topics, CPid) -> any() when
Topics :: list(binary()),
CPid :: pid().
redeliver(Topics, CPid) when is_pid(CPid) ->
lists:foreach(fun(Topic) ->
case emqtt_topic:wildcard(Topic) of
false ->
dispatch(CPid, mnesia:dirty_read(message, Topic));
true ->
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
case emqtt_topic:match(Name, Topic) of
true -> [Msg|Acc];
false -> Acc
end
end,
Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]),
dispatch(CPid, lists:reverse(Msgs))
end
end, Topics).
-spec redeliver(Topic, CPid) -> any() when
Topic :: binary(),
CPid :: pid().
redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
case emqtt_topic:wildcard(Topic) of
false ->
dispatch(CPid, mnesia:dirty_read(message, Topic));
true ->
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
case emqtt_topic:match(Name, Topic) of
true -> [Msg|Acc];
false -> Acc
end
end,
Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]),
dispatch(CPid, lists:reverse(Msgs))
end.
dispatch(_CPid, []) ->
ignore;
@ -115,3 +126,4 @@ dispatch(CPid, Msgs) when is_list(Msgs) ->
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
CPid ! {dispatch, {self(), Msg}}.

View File

@ -139,8 +139,10 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
end
end,
case mnesia:transaction(F) of
{atomic, ok} -> {ok, Qos};
{aborted, Reason} -> {error, Reason}
{atomic, ok} ->
{ok, Qos};
{aborted, Reason} ->
{error, Reason}
end.
%%------------------------------------------------------------------------------

View File

@ -64,10 +64,8 @@ start_link() ->
-spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok.
route(From, Msg) ->
lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]),
% TODO: retained message should be stored in emqttd_pubsub...
% emqttd_retained:retain(Msg),
% unset flag and pubsub
emqttd_pubsub:publish(Msg).
emqttd_msg_store:retain(Msg),
emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)).
%%%=============================================================================
%%% gen_server callbacks

View File

@ -187,7 +187,7 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
%%TODO: should be gen_event and notification...
emqttd_retained:redeliver([Name || {Name, _} <- Topics], self()),
emqttd_msg_store:redeliver([Name || {Name, _} <- Topics], self()),
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
subscribe(SessPid, Topics) when is_pid(SessPid) ->

View File

@ -0,0 +1,64 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd utility functions.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_utils).
-export([apply_module_attributes/1,
all_module_attributes/1]).
%% only {F, Args}...
apply_module_attributes(Name) ->
[{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} ||
{_App, Module, Attrs} <- all_module_attributes(Name)].
%% copy from rabbit_misc.erl
all_module_attributes(Name) ->
Targets =
lists:usort(
lists:append(
[[{App, Module} || Module <- Modules] ||
{App, _, _} <- application:loaded_applications(),
{ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
Atts -> [{App, Module, Atts} | Acc]
end
end, [], Targets).
%% copy from rabbit_misc.erl
module_attributes(Module) ->
case catch Module:module_info(attributes) of
{'EXIT', {undef, [{Module, module_info, _} | _]}} ->
[];
{'EXIT', Reason} ->
exit(Reason);
V ->
V
end.