diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index f68a4deff..5a26fd291 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -14,8 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd broker -%% @author Feng Lee -module(emqttd_broker). -behaviour(gen_server). @@ -27,9 +25,6 @@ %% API Function Exports -export([start_link/0]). -%% Running nodes --export([running_nodes/0]). - %% Event API -export([subscribe/1, notify/2]). @@ -69,11 +64,6 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% @doc Get running nodes --spec running_nodes() -> list(node()). -running_nodes() -> - mnesia:system_info(running_db_nodes). - %% @doc Subscribe broker event -spec subscribe(EventType :: any()) -> ok. subscribe(EventType) -> @@ -82,8 +72,7 @@ subscribe(EventType) -> %% @doc Notify broker event -spec notify(EventType :: any(), Event :: any()) -> ok. notify(EventType, Event) -> - Key = {broker, EventType}, - gproc:send({p, l, Key}, {self(), Key, Event}). + gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}). %% @doc Get broker env env(Name) -> @@ -101,8 +90,7 @@ sysdescr() -> %% @doc Get broker uptime -spec uptime() -> string(). -uptime() -> - gen_server:call(?SERVER, uptime). +uptime() -> gen_server:call(?SERVER, uptime). %% @doc Get broker datetime -spec datetime() -> string(). @@ -166,7 +154,7 @@ stop_tick(TRef) -> %%-------------------------------------------------------------------- init([]) -> - emqttd:seed_now(), + emqttd_time:seed(), ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics emqttd_pubsub:create(topic, <<"$SYS/brokers">>), @@ -182,24 +170,24 @@ handle_call(uptime, _From, State) -> handle_call({hook, Hook, Name, MFArgs}, _From, State) -> Key = {hook, Hook}, Reply = case ets:lookup(?BROKER_TAB, Key) of - [{Key, Hooks}] -> + [{Key, Hooks}] -> case lists:keyfind(Name, 1, Hooks) of {Name, _MFArgs} -> {error, existed}; false -> - ets:insert(?BROKER_TAB, {Key, Hooks ++ [{Name, MFArgs}]}) + insert_hooks(Key, Hooks ++ [{Name, MFArgs}]) end; - [] -> - ets:insert(?BROKER_TAB, {Key, [{Name, MFArgs}]}) + [] -> + insert_hooks(Key, [{Name, MFArgs}]) end, {reply, Reply, State}; handle_call({unhook, Hook, Name}, _From, State) -> Key = {hook, Hook}, Reply = case ets:lookup(?BROKER_TAB, Key) of - [{Key, Hooks}] -> - ets:insert(?BROKER_TAB, {Key, lists:keydelete(Name, 1, Hooks)}); - [] -> + [{Key, Hooks}] -> + insert_hooks(Key, lists:keydelete(Name, 1, Hooks)); + [] -> {error, not_found} end, {reply, Reply, State}; @@ -236,11 +224,15 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +insert_hooks(Key, Hooks) -> + ets:insert(?BROKER_TAB, {Key, Hooks}), ok. + create_topic(Topic) -> emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)). retain(brokers) -> - Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), + Payload = list_to_binary(string:join([atom_to_list(N) || + N <- emqttd_mnesia:running_nodes()], ",")), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).