move running_nodes/0 to emqttd_mnesia and improve some functions

This commit is contained in:
Feng 2016-02-11 14:31:42 +08:00
parent a5116c7fa6
commit 9dda709aa1
1 changed files with 15 additions and 23 deletions

View File

@ -14,8 +14,6 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc emqttd broker
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_broker). -module(emqttd_broker).
-behaviour(gen_server). -behaviour(gen_server).
@ -27,9 +25,6 @@
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
%% Running nodes
-export([running_nodes/0]).
%% Event API %% Event API
-export([subscribe/1, notify/2]). -export([subscribe/1, notify/2]).
@ -69,11 +64,6 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Get running nodes
-spec running_nodes() -> list(node()).
running_nodes() ->
mnesia:system_info(running_db_nodes).
%% @doc Subscribe broker event %% @doc Subscribe broker event
-spec subscribe(EventType :: any()) -> ok. -spec subscribe(EventType :: any()) -> ok.
subscribe(EventType) -> subscribe(EventType) ->
@ -82,8 +72,7 @@ subscribe(EventType) ->
%% @doc Notify broker event %% @doc Notify broker event
-spec notify(EventType :: any(), Event :: any()) -> ok. -spec notify(EventType :: any(), Event :: any()) -> ok.
notify(EventType, Event) -> notify(EventType, Event) ->
Key = {broker, EventType}, gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}).
gproc:send({p, l, Key}, {self(), Key, Event}).
%% @doc Get broker env %% @doc Get broker env
env(Name) -> env(Name) ->
@ -101,8 +90,7 @@ sysdescr() ->
%% @doc Get broker uptime %% @doc Get broker uptime
-spec uptime() -> string(). -spec uptime() -> string().
uptime() -> uptime() -> gen_server:call(?SERVER, uptime).
gen_server:call(?SERVER, uptime).
%% @doc Get broker datetime %% @doc Get broker datetime
-spec datetime() -> string(). -spec datetime() -> string().
@ -166,7 +154,7 @@ stop_tick(TRef) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqttd:seed_now(), emqttd_time:seed(),
ets:new(?BROKER_TAB, [set, public, named_table]), ets:new(?BROKER_TAB, [set, public, named_table]),
% Create $SYS Topics % Create $SYS Topics
emqttd_pubsub:create(topic, <<"$SYS/brokers">>), emqttd_pubsub:create(topic, <<"$SYS/brokers">>),
@ -182,24 +170,24 @@ handle_call(uptime, _From, State) ->
handle_call({hook, Hook, Name, MFArgs}, _From, State) -> handle_call({hook, Hook, Name, MFArgs}, _From, State) ->
Key = {hook, Hook}, Reply = Key = {hook, Hook}, Reply =
case ets:lookup(?BROKER_TAB, Key) of case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] -> [{Key, Hooks}] ->
case lists:keyfind(Name, 1, Hooks) of case lists:keyfind(Name, 1, Hooks) of
{Name, _MFArgs} -> {Name, _MFArgs} ->
{error, existed}; {error, existed};
false -> false ->
ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) insert_hooks(Key, Hooks ++ [{Name, MFArgs}])
end; end;
[] -> [] ->
ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) insert_hooks(Key, [{Name, MFArgs}])
end, end,
{reply, Reply, State}; {reply, Reply, State};
handle_call({unhook, Hook, Name}, _From, State) -> handle_call({unhook, Hook, Name}, _From, State) ->
Key = {hook, Hook}, Reply = Key = {hook, Hook}, Reply =
case ets:lookup(?BROKER_TAB, Key) of case ets:lookup(?BROKER_TAB, Key) of
[{Key, Hooks}] -> [{Key, Hooks}] ->
ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); insert_hooks(Key, lists:keydelete(Name, 1, Hooks));
[] -> [] ->
{error, not_found} {error, not_found}
end, end,
{reply, Reply, State}; {reply, Reply, State};
@ -236,11 +224,15 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
insert_hooks(Key, Hooks) ->
ets:insert(?BROKER_TAB, {Key, Hooks}), ok.
create_topic(Topic) -> create_topic(Topic) ->
emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)). emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)).
retain(brokers) -> retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- emqttd_mnesia:running_nodes()], ",")),
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).