event manager

This commit is contained in:
Ery Lee 2015-03-12 01:13:06 +08:00
parent bcd354f77d
commit c5a72bd1fb
3 changed files with 32 additions and 5 deletions

View File

@ -148,6 +148,7 @@ handle_info(Info, State = #state{peer_name = PeerName}) ->
terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) -> terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) ->
lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]),
notify(disconnected, Reason, ProtoState),
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of case {ProtoState, Reason} of
{undefined, _} -> ok; {undefined, _} -> ok;
@ -237,3 +238,11 @@ inc(?DISCONNECT) ->
inc(_) -> inc(_) ->
ignore. ignore.
%%TODO: should be moved to emqttd_protocol... for event emitted when protocol shutdown...
notify(disconnected, _Reason, undefined) -> ingore;
notify(disconnected, {shutdown, Reason}, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]});
notify(disconnected, Reason, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}).

View File

@ -30,7 +30,8 @@
%% API Function Exports %% API Function Exports
-export([start_link/0, -export([start_link/0,
add_handler/2]). add_handler/2,
notify/1]).
%% gen_event Function Exports %% gen_event Function Exports
-export([init/1, -export([init/1,
@ -61,24 +62,26 @@ start_link() ->
add_handler(Handler, Args) -> add_handler(Handler, Args) ->
gen_event:add_handler(?MODULE, Handler, Args). gen_event:add_handler(?MODULE, Handler, Args).
notify(Event) ->
gen_event:notify(?MODULE, Event).
%%%============================================================================= %%%=============================================================================
%%% gen_event callbacks %%% gen_event callbacks
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node()])), SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node(), "/"])),
{ok, #state{systop = SysTop}}. {ok, #state{systop = SysTop}}.
handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
emqttd_pubsub:publish(Msg), emqttd_router:route(Msg),
{ok, State}; {ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_pubsub:publish(Msg), emqttd_router:route(Msg),
{ok, State}; {ok, State};
handle_event({subscribed, ClientId, TopicTable}, State) -> handle_event({subscribed, ClientId, TopicTable}, State) ->
@ -116,5 +119,5 @@ payload(connected, Params) ->
iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess])); iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess]));
payload(disconnected, Reason) -> payload(disconnected, Reason) ->
list_to_binary(lists:concat(["reason: ", Reason])). list_to_binary(io_lib:format(["reason: ~p", Reason])).

View File

@ -126,6 +126,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}
ReturnCode -> ReturnCode ->
{ReturnCode, State#proto_state{client_id = ClientId}} {ReturnCode, State#proto_state{client_id = ClientId}}
end, end,
notify(connected, ReturnCode1, State1),
send(?CONNACK_PACKET(ReturnCode1), State1), send(?CONNACK_PACKET(ReturnCode1), State1),
%%Starting session %%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}),
@ -318,4 +319,18 @@ inc(?PINGRESP) ->
inc(_) -> inc(_) ->
ingore. ingore.
notify(connected, ReturnCode, #proto_state{peer_name = PeerName,
proto_vsn = ProtoVsn,
client_id = ClientId,
clean_sess = CleanSess}) ->
Sess = case CleanSess of
true -> false;
false -> true
end,
Params = [{from, PeerName},
{protocol, ProtoVsn},
{session, Sess},
{connack, ReturnCode}],
emqttd_event:notify({connected, ClientId, Params}).