diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 1f2014bc5..cda22a495 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -148,6 +148,7 @@ handle_info(Info, State = #state{peer_name = PeerName}) -> terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) -> lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), + notify(disconnected, Reason, ProtoState), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; @@ -237,3 +238,11 @@ inc(?DISCONNECT) -> inc(_) -> ignore. +%%TODO: should be moved to emqttd_protocol... for event emitted when protocol shutdown... +notify(disconnected, _Reason, undefined) -> ingore; + +notify(disconnected, {shutdown, Reason}, ProtoState) -> + emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}); + +notify(disconnected, Reason, ProtoState) -> + emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}). diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index a2fa6fdf0..730456633 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -30,7 +30,8 @@ %% API Function Exports -export([start_link/0, - add_handler/2]). + add_handler/2, + notify/1]). %% gen_event Function Exports -export([init/1, @@ -61,24 +62,26 @@ start_link() -> add_handler(Handler, Args) -> gen_event:add_handler(?MODULE, Handler, Args). +notify(Event) -> + gen_event:notify(?MODULE, Event). %%%============================================================================= %%% gen_event callbacks %%%============================================================================= init([]) -> - SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node()])), + SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node(), "/"])), {ok, #state{systop = SysTop}}. handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_pubsub:publish(Msg), + emqttd_router:route(Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_pubsub:publish(Msg), + emqttd_router:route(Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> @@ -116,5 +119,5 @@ payload(connected, Params) -> iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess])); payload(disconnected, Reason) -> - list_to_binary(lists:concat(["reason: ", Reason])). + list_to_binary(io_lib:format(["reason: ~p", Reason])). diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 941f0635e..71684de8f 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -126,6 +126,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName} ReturnCode -> {ReturnCode, State#proto_state{client_id = ClientId}} end, + notify(connected, ReturnCode1, State1), send(?CONNACK_PACKET(ReturnCode1), State1), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), @@ -318,4 +319,18 @@ inc(?PINGRESP) -> inc(_) -> ingore. +notify(connected, ReturnCode, #proto_state{peer_name = PeerName, + proto_vsn = ProtoVsn, + client_id = ClientId, + clean_sess = CleanSess}) -> + Sess = case CleanSess of + true -> false; + false -> true + end, + Params = [{from, PeerName}, + {protocol, ProtoVsn}, + {session, Sess}, + {connack, ReturnCode}], + emqttd_event:notify({connected, ClientId, Params}). +