From bcd354f77d27f01c4bb48c218e41664032091c46 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Thu, 12 Mar 2015 00:38:56 +0800 Subject: [PATCH] event manager --- apps/emqttd/src/emqttd_app.erl | 1 + apps/emqttd/src/emqttd_event.erl | 91 +++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index e7dad3684..404591480 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -96,6 +96,7 @@ start_servers(Sup) -> ?PRINT_MSG("[done]~n") end, [{"emqttd config", emqttd_config}, + {"emqttd event", emqttd_event}, {"emqttd server", emqttd_server, RetainOpts}, {"emqttd client manager", emqttd_cm}, {"emqttd session manager", emqttd_sm}, diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 1f2de5fb9..a2fa6fdf0 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -26,8 +26,95 @@ %%%----------------------------------------------------------------------------- -module(emqttd_event). --export([start_link/0]). +-include("emqttd_packet.hrl"). +%% API Function Exports +-export([start_link/0, + add_handler/2]). + +%% gen_event Function Exports +-export([init/1, + handle_event/2, + handle_call/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {systop}). + +%%------------------------------------------------------------------------------ +%% @doc +%% Start emqttd event manager. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()} | {error, any()}. start_link() -> - gen_event:start_link({local, ?MODULE}). + case gen_event:start_link({local, ?MODULE}) of + {ok, Pid} -> + add_handler(?MODULE, []), + {ok, Pid}; + {error, Reason} -> + {error, Reason} + end. + +add_handler(Handler, Args) -> + gen_event:add_handler(?MODULE, Handler, Args). + +%%%============================================================================= +%%% gen_event callbacks +%%%============================================================================= + +init([]) -> + SysTop = list_to_binary(lists:concat(["$SYS/brokers/", node()])), + {ok, #state{systop = SysTop}}. + +handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> + Topic = <>, + Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, + emqttd_pubsub:publish(Msg), + {ok, State}; + +handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> + Topic = <>, + Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, + emqttd_pubsub:publish(Msg), + {ok, State}; + +handle_event({subscribed, ClientId, TopicTable}, State) -> + lager:error("TODO: subscribed ~s, ~p", [ClientId, TopicTable]), + {ok, State}; + +handle_event({unsubscribed, ClientId, Topics}, State) -> + lager:error("TODO: unsubscribed ~s, ~p", [ClientId, Topics]), + {ok, State}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + Reply = ok, + {ok, Reply, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + +payload(connected, Params) -> + From = proplists:get_value(from, Params), + Proto = proplists:get_value(protocol, Params), + Sess = proplists:get_value(session, Params), + iolist_to_binary(io_lib:format("from: ~s~nprotocol: ~p~nsession: ~s", [From, Proto, Sess])); + +payload(disconnected, Reason) -> + list_to_binary(lists:concat(["reason: ", Reason])).