event manager

This commit is contained in:
Ery Lee 2015-03-12 00:38:56 +08:00
parent e5ba03c9e0
commit bcd354f77d
2 changed files with 90 additions and 2 deletions

View File

@ -96,6 +96,7 @@ start_servers(Sup) ->
?PRINT_MSG("[done]~n")
end,
[{"emqttd config", emqttd_config},
{"emqttd event", emqttd_event},
{"emqttd server", emqttd_server, RetainOpts},
{"emqttd client manager", emqttd_cm},
{"emqttd session manager", emqttd_sm},

View File

@ -26,8 +26,95 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_event).
-export([start_link/0]).
-include("emqttd_packet.hrl").
%% API Function Exports
-export([start_link/0,
add_handler/2]).
%% gen_event Function Exports
-export([init/1,
handle_event/2,
handle_call/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {systop}).
%%------------------------------------------------------------------------------
%% @doc
%% Start emqttd event manager.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | {error, any()}.
start_link() ->
gen_event:start_link({local, ?MODULE}).
case gen_event:start_link({local, ?MODULE}) of
{ok, Pid} ->
add_handler(?MODULE, []),
{ok, Pid};
{error, Reason} ->
{error, Reason}
end.
add_handler(Handler, Args) ->
gen_event:add_handler(?MODULE, Handler, Args).
%%%=============================================================================
%%% gen_event callbacks
%%%=============================================================================
init([]) ->
SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node()])),
{ok, #state{systop = SysTop}}.
handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
emqttd_pubsub:publish(Msg),
{ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_pubsub:publish(Msg),
{ok, State};
handle_event({subscribed, ClientId, TopicTable}, State) ->
lager:error("TODO: subscribed ~s, ~p", [ClientId, TopicTable]),
{ok, State};
handle_event({unsubscribed, ClientId, Topics}, State) ->
lager:error("TODO: unsubscribed ~s, ~p", [ClientId, Topics]),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
handle_info(_Info, State) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
payload(connected, Params) ->
From = proplists:get_value(from, Params),
Proto = proplists:get_value(protocol, Params),
Sess = proplists:get_value(session, Params),
iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess]));
payload(disconnected, Reason) ->
list_to_binary(lists:concat(["reason: ", Reason])).