From d9d7581013e9149b90f045d274fcebff58d90d30 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 11 Mar 2016 23:42:37 +0800 Subject: [PATCH] 0.17.0 - Improve the design of Hook, PubSub and Router --- src/emqttd.erl | 69 +++++++++++--- src/emqttd_app.erl | 1 + src/emqttd_broker.erl | 65 -------------- src/emqttd_cli.erl | 110 +++++++++++------------ src/emqttd_hook.erl | 155 ++++++++++++++++++++++++++++++++ src/emqttd_mod_presence.erl | 56 ++++++------ src/emqttd_mod_rewrite.erl | 36 ++++---- src/emqttd_mod_subscription.erl | 23 ++--- src/emqttd_protocol.erl | 8 +- src/emqttd_pubsub.erl | 24 ++--- src/emqttd_router.erl | 16 ++-- src/emqttd_server.erl | 55 +++++++----- src/emqttd_session.erl | 89 ++++++++++-------- 13 files changed, 425 insertions(+), 282 deletions(-) create mode 100644 src/emqttd_hook.erl diff --git a/src/emqttd.erl b/src/emqttd.erl index 9a740e1cd..9bc455fbd 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -22,9 +22,13 @@ -export([start/0, env/1, env/2, is_running/1]). --export([create/2, publish/1, subscribe/1, subscribe/3, +%% PubSub API +-export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3, unsubscribe/1, unsubscribe/3]). +%% Hooks API +-export([hook/4, hook/3, unhook/2, run_hooks/3]). + -define(APP, ?MODULE). %%-------------------------------------------------------------------- @@ -32,19 +36,19 @@ %%-------------------------------------------------------------------- %% @doc Start emqttd application. --spec start() -> ok | {error, any()}. +-spec(start() -> ok | {error, any()}). start() -> application:start(?APP). %% @doc Group environment --spec env(Group :: atom()) -> list(). +-spec(env(Group :: atom()) -> list()). env(Group) -> application:get_env(?APP, Group, []). %% @doc Get environment --spec env(Group :: atom(), Name :: atom()) -> undefined | any(). +-spec(env(Group :: atom(), Name :: atom()) -> undefined | any()). env(Group, Name) -> proplists:get_value(Name, env(Group)). %% @doc Is running? --spec is_running(node()) -> boolean(). +-spec(is_running(node()) -> boolean()). is_running(Node) -> case rpc:call(Node, erlang, whereis, [?APP]) of {badrpc, _} -> false; @@ -56,30 +60,67 @@ is_running(Node) -> %% PubSub APIs that wrap emqttd_server, emqttd_pubsub %%-------------------------------------------------------------------- -%% @doc Create a Topic +%% @doc Lookup Topic or Subscription +-spec(lookup(topic, binary()) -> [mqtt_topic()]; + (subscription, binary()) -> [mqtt_subscription()]). +lookup(topic, Topic) when is_binary(Topic) -> + emqttd_pubsub:lookup_topic(Topic); + +lookup(subscription, ClientId) when is_binary(ClientId) -> + emqttd_server:lookup_subscription(ClientId). + +%% @doc Create a Topic or Subscription +-spec(create(topic | subscription, binary()) -> ok | {error, any()}). create(topic, Topic) when is_binary(Topic) -> - emqttd_pubsub:create_topic(Topic). + emqttd_pubsub:create_topic(Topic); + +create(subscription, {ClientId, Topic, Qos}) -> + Subscription = #mqtt_subscription{subid = ClientId, topic = Topic, qos = ?QOS_I(Qos)}, + emqttd_backend:add_subscription(Subscription). %% @doc Publish MQTT Message --spec publish(mqtt_message()) -> ok. +-spec(publish(mqtt_message()) -> ok). publish(Msg) when is_record(Msg, mqtt_message) -> - emqttd_server:publish(Msg). + emqttd_server:publish(Msg), ok. %% @doc Subscribe --spec subscribe(binary()) -> ok. +-spec(subscribe(binary()) -> ok; + ({binary(), binary(), mqtt_qos()}) -> ok). subscribe(Topic) when is_binary(Topic) -> - emqttd_server:subscribe(Topic). + emqttd_server:subscribe(Topic); +subscribe({ClientId, Topic, Qos}) -> + subscribe(ClientId, Topic, Qos). --spec subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}. +-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}). subscribe(ClientId, Topic, Qos) -> emqttd_server:subscribe(ClientId, Topic, Qos). %% @doc Unsubscribe --spec unsubscribe(binary()) -> ok. +-spec(unsubscribe(binary()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> emqttd_server:unsubscribe(Topic). --spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok. +-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok). unsubscribe(ClientId, Topic, Qos) -> emqttd_server:unsubscribe(ClientId, Topic, Qos). +%%-------------------------------------------------------------------- +%% Hooks API +%%-------------------------------------------------------------------- + +-spec(hook(atom(), function(), list(any())) -> ok | {error, any()}). +hook(Hook, Function, InitArgs) -> + emqttd_hook:add(Hook, Function, InitArgs). + +-spec(hook(atom(), function(), list(any()), integer()) -> ok | {error, any()}). +hook(Hook, Function, InitArgs, Priority) -> + emqttd_hook:add(Hook, Function, InitArgs, Priority). + +-spec(unhook(atom(), function()) -> ok | {error, any()}). +unhook(Hook, Function) -> + emqttd_hook:delete(Hook, Function). + +-spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). +run_hooks(Hook, Args, Acc) -> + emqttd_hook:run(Hook, Args, Acc). + diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 385ca134e..8b92d496b 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -79,6 +79,7 @@ print_vsn() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, + {"emqttd hook", emqttd_hook}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 50b72720a..760dba041 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -28,9 +28,6 @@ %% Event API -export([subscribe/1, notify/2]). -%% Hook API --export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]). - %% Broker API -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). @@ -100,40 +97,6 @@ datetime() -> io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). -%% @doc Hook --spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. -hook(Hook, Name, MFA) -> - gen_server:call(?SERVER, {hook, Hook, Name, MFA}). - -%% @doc Unhook --spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. -unhook(Hook, Name) -> - gen_server:call(?SERVER, {unhook, Hook, Name}). - -%% @doc Foreach hooks --spec foreach_hooks(Hook :: atom(), Args :: list()) -> any(). -foreach_hooks(Hook, Args) -> - case ets:lookup(?BROKER_TAB, {hook, Hook}) of - [{_, Hooks}] -> - lists:foreach(fun({_Name, {M, F, A}}) -> - apply(M, F, Args++A) - end, Hooks); - [] -> - ok - end. - -%% @doc Foldl hooks --spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any(). -foldl_hooks(Hook, Args, Acc0) -> - case ets:lookup(?BROKER_TAB, {hook, Hook}) of - [{_, Hooks}] -> - lists:foldl(fun({_Name, {M, F, A}}, Acc) -> - apply(M, F, lists:append([Args, [Acc], A])) - end, Acc0, Hooks); - [] -> - Acc0 - end. - %% @doc Start a tick timer start_tick(Msg) -> start_tick(timer:seconds(env(sys_interval)), Msg). @@ -167,31 +130,6 @@ init([]) -> handle_call(uptime, _From, State) -> {reply, uptime(State), State}; -handle_call({hook, Hook, Name, MFArgs}, _From, State) -> - Key = {hook, Hook}, Reply = - case ets:lookup(?BROKER_TAB, Key) of - [{Key, Hooks}] -> - case lists:keyfind(Name, 1, Hooks) of - {Name, _MFArgs} -> - {error, existed}; - false -> - insert_hooks(Key, Hooks ++ [{Name, MFArgs}]) - end; - [] -> - insert_hooks(Key, [{Name, MFArgs}]) - end, - {reply, Reply, State}; - -handle_call({unhook, Hook, Name}, _From, State) -> - Key = {hook, Hook}, Reply = - case ets:lookup(?BROKER_TAB, Key) of - [{Key, Hooks}] -> - insert_hooks(Key, lists:keydelete(Name, 1, Hooks)); - [] -> - {error, not_found} - end, - {reply, Reply, State}; - handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -224,9 +162,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -insert_hooks(Key, Hooks) -> - ets:insert(?BROKER_TAB, {Key, Hooks}), ok. - create_topic(Topic) -> emqttd:create(topic, emqttd_topic:systop(Topic)). diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 17225ac4a..04f0014d6 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -40,7 +40,7 @@ stack_size, reductions]). --define(MAX_LINES, 20000). +-define(MAX_LINES, 10000). -define(APP, emqttd). @@ -148,7 +148,7 @@ users(Args) -> emqttd_auth_username:cli(Args). %%-------------------------------------------------------------------- %% @doc Query clients clients(["list"]) -> - dump(ets, mqtt_client, fun print/1); + dump(mqtt_client); clients(["show", ClientId]) -> if_client(ClientId, fun print/1); @@ -173,10 +173,10 @@ sessions(["list"]) -> [sessions(["list", Type]) || Type <- ["persistent", "transient"]]; sessions(["list", "persistent"]) -> - dump(ets, mqtt_persistent_session, fun print/1); + dump(mqtt_persistent_session); sessions(["list", "transient"]) -> - dump(ets, mqtt_transient_session, fun print/1); + dump(mqtt_transient_session); sessions(["show", ClientId]) -> MP = {{bin(ClientId), '_'}, '_'}, @@ -199,11 +199,10 @@ sessions(_) -> %%-------------------------------------------------------------------- %% @doc Routes Command routes(["list"]) -> - Print = fun(Topic, Records) -> print(route, Topic, Records) end, - if_could_print(route, Print); + if_could_print(route, fun print/1); routes(["show", Topic]) -> - print(route, Topic, mnesia:dirty_read(route, bin(Topic))); + print(mnesia:dirty_read(route, bin(Topic))); routes(_) -> ?USAGE([{"routes list", "List all routes"}, @@ -212,28 +211,25 @@ routes(_) -> %%-------------------------------------------------------------------- %% @doc Topics Command topics(["list"]) -> - Print = fun(Topic, Records) -> print(topic, Topic, Records) end, - if_could_print(topic, Print); + if_could_print(topic, fun print/1); topics(["show", Topic]) -> - print(topic, Topic, ets:lookup(topic, bin(Topic))); + print(mnesia:dirty_read(topic, bin(Topic))); topics(_) -> ?USAGE([{"topics list", "List all topics"}, {"topics show ", "Show a topic"}]). subscriptions(["list"]) -> - Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, - if_could_print(subscription, Print); + if_could_print(subscription, fun print/1); subscriptions(["list", "static"]) -> - Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end, - if_could_print(backend_subscription, Print); + if_could_print(backend_subscription, fun print/1); subscriptions(["show", ClientId]) -> case mnesia:dirty_read(subscription, bin(ClientId)) of [] -> ?PRINT_MSG("Not Found.~n"); - Records -> print(subscription, ClientId, Records) + Records -> print(Records) end; subscriptions(["add", ClientId, Topic, QoS]) -> @@ -242,11 +238,11 @@ subscriptions(["add", ClientId, Topic, QoS]) -> topic = bin(Topic), qos = IntQos}, case emqttd_backend:add_subscription(Subscription) of - {atomic, ok} -> + ok -> ?PRINT_MSG("ok~n"); - {aborted, {error, existed}} -> + {error, already_existed} -> ?PRINT_MSG("Error: already existed~n"); - {aborted, Reason} -> + {error, Reason} -> ?PRINT("Error: ~p~n", [Reason]) end end, @@ -274,7 +270,7 @@ if_could_print(Tab, Fun) -> ?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]); _Size -> Keys = mnesia:dirty_all_keys(Tab), - foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys) + foreach(fun(Key) -> Fun(ets:lookup(Tab, Key)) end, Keys) end. if_valid_qos(QoS, Fun) -> @@ -465,23 +461,53 @@ listeners([]) -> listeners(_) -> ?PRINT_CMD("listeners", "List listeners"). +%%-------------------------------------------------------------------- +%% Dump ETS +%%-------------------------------------------------------------------- + +dump(Table) -> + dump(Table, ets:first(Table)). + +dump(_Table, '$end_of_table') -> + ok; + +dump(Table, Key) -> + case ets:lookup(Table, Key) of + [Record] -> print(Record); + [] -> ok + end, + dump(Table, ets:next(Table, Key)). + +print([]) -> + ok; + +print(Routes = [#mqtt_route{topic = Topic} | _]) -> + Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], + ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]); + +print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) -> + TopicTable = [io_lib:format("~s:~w", [Topic, Qos]) + || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], + ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]); + +print(Topics = [#mqtt_topic{}|_]) -> + foreach(fun print/1, Topics); + print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", - [Name, Ver, Descr, Active]); + [Name, Ver, Descr, Active]); -print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, - username = Username, peername = Peername, - connected_at = ConnectedAt}) -> +print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = Username, + peername = Peername, connected_at = ConnectedAt}) -> ?PRINT("Client(~s, clean_sess=~s, username=~s, peername=~s, connected_at=~p)~n", - [ClientId, CleanSess, Username, - emqttd_net:format(Peername), - emqttd_time:now_to_secs(ConnectedAt)]); + [ClientId, CleanSess, Username, emqttd_net:format(Peername), + emqttd_time:now_to_secs(ConnectedAt)]); print(#mqtt_topic{topic = Topic, flags = Flags}) -> - ?PRINT("~s: ~p~n", [Topic, Flags]); + ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]); print(#mqtt_route{topic = Topic, node = Node}) -> - ?PRINT("~s: ~s~n", [Topic, Node]); + ?PRINT("~s -> ~s~n", [Topic, Node]); print({{ClientId, _ClientPid}, SessInfo}) -> InfoKeys = [clean_sess, @@ -499,39 +525,11 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "created_at=~w)~n", [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). -print(route, Topic, Routes) -> - Nodes = [Node || #mqtt_route{node = Node} <- Routes], - ?PRINT("~s: ~p~n", [Topic, Nodes]); - -print(topic, _Topic, Records) -> - [print(R) || R <- Records]; - -print(subscription, ClientId, Subscriptions) -> - TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], - ?PRINT("~s: ~p~n", [ClientId, TopicTable]). - format(created_at, Val) -> emqttd_time:now_to_secs(Val); -format(subscriptions, List) -> - string:join([io_lib:format("~s:~w", [Topic, Qos]) || {Topic, Qos} <- List], ","); - format(_, Val) -> Val. bin(S) -> iolist_to_binary(S). -%%TODO: ... -dump(ets, Table, Fun) -> - dump(ets, Table, ets:first(Table), Fun). - -dump(ets, _Table, '$end_of_table', _Fun) -> - ok; - -dump(ets, Table, Key, Fun) -> - case ets:lookup(Table, Key) of - [Record] -> Fun(Record); - [] -> ignore - end, - dump(ets, Table, ets:next(Table, Key), Fun). - diff --git a/src/emqttd_hook.erl b/src/emqttd_hook.erl new file mode 100644 index 000000000..f183c98d3 --- /dev/null +++ b/src/emqttd_hook.erl @@ -0,0 +1,155 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_hook). + +-author("Feng Lee "). + +-behaviour(gen_server). + +%% Start +-export([start_link/0]). + +%% Hooks API +-export([add/3, add/4, delete/2, run/3, lookup/1]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). + +-record(callback, {function :: function(), + init_args = [] :: list(any()), + priority = 0 :: integer()}). + +-record(hook, {name :: atom(), callbacks = [] :: list(#callback{})}). + +-define(HOOK_TAB, mqtt_hook). + +%%-------------------------------------------------------------------- +%% Start API +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% Hooks API +%%-------------------------------------------------------------------- + +-spec(add(atom(), function(), list(any())) -> ok). +add(HookPoint, Function, InitArgs) -> + add(HookPoint, Function, InitArgs, 0). + +-spec(add(atom(), function(), list(any()), integer()) -> ok). +add(HookPoint, Function, InitArgs, Priority) -> + gen_server:call(?MODULE, {add, HookPoint, Function, InitArgs, Priority}). + +-spec(delete(atom(), function()) -> ok). +delete(HookPoint, Function) -> + gen_server:call(?MODULE, {delete, HookPoint, Function}). + +-spec(run(atom(), list(any()), any()) -> any()). +run(HookPoint, Args, Acc) -> + run_(lookup(HookPoint), Args, Acc). + +%% @private +run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -> + case apply(Fun, lists:append([Args, [Acc], InitArgs])) of + ok -> run_(Callbacks, Args, Acc); + {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); + stop -> {stop, Acc}; + {stop, NewAcc} -> {stop, NewAcc} + end; + +run_([], _Args, Acc) -> + {ok, Acc}. + +-spec(lookup(atom()) -> [#callback{}]). +lookup(HookPoint) -> + case ets:lookup(?HOOK_TAB, HookPoint) of + [] -> []; + [#hook{callbacks = Callbacks}] -> Callbacks + end. + +%%-------------------------------------------------------------------- +%% gen_server Callbacks +%%-------------------------------------------------------------------- + +init([]) -> + ets:new(?HOOK_TAB, [set, protected, named_table, {keypos, #hook.name}]), + {ok, #state{}}. + +handle_call({add, HookPoint, Function, InitArgs, Priority}, _From, State) -> + Reply = + case ets:lookup(?HOOK_TAB, HookPoint) of + [#hook{callbacks = Callbacks}] -> + case lists:keyfind(Function, #callback.function, Callbacks) of + false -> + Callback = #callback{function = Function, + init_args = InitArgs, + priority = Priority}, + insert_hook_(HookPoint, add_callback_(Callback, Callbacks)); + _Callback -> + {error, already_hooked} + end; + [] -> + Callback = #callback{function = Function, + init_args = InitArgs, + priority = Priority}, + insert_hook_(HookPoint, [Callback]) + end, + {reply, Reply, State}; + +handle_call({delete, HookPoint, Function}, _From, State) -> + Reply = + case ets:lookup(?HOOK_TAB, HookPoint) of + [#hook{callbacks = Callbacks}] -> + insert_hook_(HookPoint, del_callback_(Function, Callbacks)); + [] -> + {error, not_found} + end, + {reply, Reply, State}; + +handle_call(_Req, _From, State) -> + {reply, ignore, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +insert_hook_(HookPoint, Callbacks) -> + ets:insert(?HOOK_TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. + +add_callback_(Callback, Callbacks) -> + lists:keymerge(#callback.priority, Callbacks, [Callback]). + +del_callback_(Function, Callbacks) -> + lists:keydelete(Function, #callback.function, Callbacks). + diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 1aa382fa2..b49511000 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -23,57 +23,51 @@ -export([load/1, unload/1]). --export([client_connected/3, client_disconnected/3]). +-export([on_client_connected/3, on_client_disconnected/3]). load(Opts) -> - emqttd_broker:hook('client.connected', {?MODULE, client_connected}, - {?MODULE, client_connected, [Opts]}), - emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, - {?MODULE, client_disconnected, [Opts]}), - ok. + emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Opts]), + emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Opts]). -client_connected(ConnAck, #mqtt_client{client_id = ClientId, - username = Username, - peername = {IpAddress, _}, - clean_sess = CleanSess, - proto_ver = ProtoVer}, Opts) -> - Sess = case CleanSess of - true -> false; - false -> true - end, +on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, + username = Username, + peername = {IpAddr, _}, + clean_sess = CleanSess, + proto_ver = ProtoVer}, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {username, Username}, - {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddress))}, - {session, Sess}, + {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddr))}, + {session, sess(CleanSess)}, {protocol, ProtoVer}, {connack, ConnAck}, {ts, emqttd_time:now_to_secs()}]), - Msg = emqttd_message:make(presence, - proplists:get_value(qos, Opts, 0), - topic(connected, ClientId), - iolist_to_binary(Json)), - emqttd:publish(Msg). + emqttd:publish(message(qos(Opts), topic(connected, ClientId), Json)), + {ok, Client}. -client_disconnected(Reason, ClientId, Opts) -> +on_client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, {ts, emqttd_time:now_to_secs()}]), - Msg = emqttd_message:make(presence, - proplists:get_value(qos, Opts, 0), - topic(disconnected, ClientId), - iolist_to_binary(Json)), - emqttd:publish(Msg). + emqttd:publish(message(qos(Opts), topic(disconnected, ClientId), Json)). unload(_Opts) -> - emqttd_broker:unhook('client.connected', {?MODULE, client_connected}), - emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}). + emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), + emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). + +sess(false) -> true; +sess(true) -> false. + +qos(Opts) -> proplists:get_value(qos, Opts, 0). + +message(Qos, Topic, Json) -> + emqttd_message:make(presence, Qos, Topic, iolist_to_binary(Json)). topic(connected, ClientId) -> emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). -reason(Reason) when is_atom(Reason) -> Reason; +reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index b1bad9766..9109d2155 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -23,7 +23,7 @@ -export([load/1, reload/1, unload/1]). --export([rewrite/3, rewrite/4]). +-export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). %%-------------------------------------------------------------------- %% API @@ -33,23 +33,19 @@ load(Opts) -> File = proplists:get_value(file, Opts), {ok, Terms} = file:consult(File), Sections = compile(Terms), - emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, - {?MODULE, rewrite, [subscribe, Sections]}), - emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, - {?MODULE, rewrite, [unsubscribe, Sections]}), - emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish}, - {?MODULE, rewrite, [publish, Sections]}), - ok. + emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]), + emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]), + emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]). -rewrite(_ClientId, TopicTable, subscribe, Sections) -> - lager:info("rewrite subscribe: ~p", [TopicTable]), - [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; +rewrite_subscribe(_ClientId, TopicTable, Sections) -> + lager:info("Rewrite subscribe: ~p", [TopicTable]), + {ok, [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]}. -rewrite(_ClientId, Topics, unsubscribe, Sections) -> - lager:info("rewrite unsubscribe: ~p", [Topics]), - [match_topic(Topic, Sections) || Topic <- Topics]. +rewrite_unsubscribe(_ClientId, Topics, Sections) -> + lager:info("Rewrite unsubscribe: ~p", [Topics]), + {ok, [match_topic(Topic, Sections) || Topic <- Topics]}. -rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) -> +rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) -> %%TODO: this will not work if the client is always online. RewriteTopic = case get({rewrite, Topic}) of @@ -59,11 +55,11 @@ rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) -> DestTopic -> DestTopic end, - Message#mqtt_message{topic = RewriteTopic}. + {ok, Message#mqtt_message{topic = RewriteTopic}}. reload(File) -> %%TODO: The unload api is not right... - case emqttd:is_mod_enabled(rewrite) of + case emqttd_app:is_mod_enabled(rewrite) of true -> unload(state), load([{file, File}]); @@ -72,9 +68,9 @@ reload(File) -> end. unload(_) -> - emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), - emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). + emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/3), + emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), + emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2). %%-------------------------------------------------------------------- %% Internal functions diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index b8d31b436..c23ab6848 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -23,26 +23,27 @@ -include("emqttd_protocol.hrl"). --export([load/1, client_connected/3, unload/1]). +-export([load/1, on_client_connected/3, unload/1]). -record(state, {topics, backend = false}). load(Opts) -> Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], State = #state{topics = Topics, backend = lists:member(backend, Opts)}, - emqttd_broker:hook('client.connected', {?MODULE, client_connected}, - {?MODULE, client_connected, [State]}), - ok. + emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [State]). + +on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId, + client_pid = ClientPid, + username = Username}, + #state{topics = Topics, backend = Backend}) -> -client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, - client_pid = ClientPid, - username = Username}, - #state{topics = Topics, backend = Backend}) -> Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)], - emqttd_client:subscribe(ClientPid, TopicTable); + emqttd_client:subscribe(ClientPid, TopicTable), + {ok, Client}; -client_connected(_ConnAck, _Client, _State) -> ok. +on_client_connected(_ConnAck, _Client, _State) -> + ok. with_backend(false, _ClientId, TopicTable) -> TopicTable; @@ -51,7 +52,7 @@ with_backend(true, ClientId, TopicTable) -> emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable). unload(_Opts) -> - emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). + emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3). rep(<<"$c">>, ClientId, Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic); diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 6764d201c..a6af27155 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -165,7 +165,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> {ReturnCode, false, State1} end, %% Run hooks - emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]), + emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)), %% Send connack send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3); @@ -247,7 +247,9 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), end. -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}. -send(Msg, State) when is_record(Msg, mqtt_message) -> +send(Msg, State = #proto_state{client_id = ClientId}) + when is_record(Msg, mqtt_message) -> + emqttd:run_hooks('message.delivered', [ClientId], Msg), send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun}) @@ -281,7 +283,7 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) -> shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], State), send_willmsg(ClientId, WillMsg), - emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]), + emqttd:run_hooks('client.disconnected', [Error], ClientId), %% let it down %% emqttd_cm:unregister(ClientId). ok. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 03ca4907f..e216a89fe 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -60,15 +60,15 @@ mnesia(copy) -> %%-------------------------------------------------------------------- %% @doc Start one pubsub --spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - Env :: list(tuple()). +-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), + Env :: list(tuple())). start_link(Pool, Id, Env) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). %% @doc Create a Topic. --spec create_topic(binary()) -> ok | {error, any()}. +-spec(create_topic(binary()) -> ok | {error, any()}). create_topic(Topic) when is_binary(Topic) -> case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of {atomic, ok} -> ok; @@ -76,7 +76,7 @@ create_topic(Topic) when is_binary(Topic) -> end. %% @doc Lookup a Topic. --spec lookup_topic(binary()) -> list(mqtt_topic()). +-spec(lookup_topic(binary()) -> list(mqtt_topic())). lookup_topic(Topic) when is_binary(Topic) -> mnesia:dirty_read(topic, Topic). @@ -85,17 +85,17 @@ lookup_topic(Topic) when is_binary(Topic) -> %%-------------------------------------------------------------------- %% @doc Subscribe a Topic --spec subscribe(binary(), pid()) -> ok. +-spec(subscribe(binary(), pid()) -> ok). subscribe(Topic, SubPid) when is_binary(Topic) -> call(pick(Topic), {subscribe, Topic, SubPid}). %% @doc Asynchronous Subscribe --spec async_subscribe(binary(), pid()) -> ok. +-spec(async_subscribe(binary(), pid()) -> ok). async_subscribe(Topic, SubPid) when is_binary(Topic) -> cast(pick(Topic), {subscribe, Topic, SubPid}). %% @doc Publish message to Topic. --spec publish(binary(), any()) -> ok. +-spec(publish(binary(), any()) -> any()). publish(Topic, Msg) -> lists:foreach( fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> @@ -105,7 +105,7 @@ publish(Topic, Msg) -> end, emqttd_router:lookup(Topic)). %% @doc Dispatch Message to Subscribers --spec dispatch(binary(), mqtt_message()) -> ok. +-spec(dispatch(binary(), mqtt_message()) -> ok). dispatch(Queue = <<"$queue/", _T>>, Msg) -> case subscribers(Queue) of [] -> @@ -148,12 +148,12 @@ dropped(_Topic) -> emqttd_metrics:inc('messages/dropped'). %% @doc Unsubscribe --spec unsubscribe(binary(), pid()) -> ok. +-spec(unsubscribe(binary(), pid()) -> ok). unsubscribe(Topic, SubPid) when is_binary(Topic) -> call(pick(Topic), {unsubscribe, Topic, SubPid}). %% @doc Asynchronous Unsubscribe --spec async_unsubscribe(binary(), pid()) -> ok. +-spec(async_unsubscribe(binary(), pid()) -> ok). async_unsubscribe(Topic, SubPid) when is_binary(Topic) -> cast(pick(Topic), {unsubscribe, Topic, SubPid}). diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index ac744e94a..a40b9e550 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -63,31 +63,31 @@ start_link() -> %%-------------------------------------------------------------------- %% @doc Lookup Routes. --spec lookup(Topic:: binary()) -> [mqtt_route()]. +-spec(lookup(Topic:: binary()) -> [mqtt_route()]). lookup(Topic) when is_binary(Topic) -> Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), %% Optimize: route table will be replicated to all nodes. lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). %% @doc Print Routes. --spec print(Topic :: binary()) -> [ok]. +-spec(print(Topic :: binary()) -> [ok]). print(Topic) -> [io:format("~s -> ~s~n", [To, Node]) || #mqtt_route{topic = To, node = Node} <- lookup(Topic)]. %% @doc Add Route --spec add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}. +-spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}). add_route(Topic) when is_binary(Topic) -> add_route(#mqtt_route{topic = Topic, node = node()}); add_route(Route) when is_record(Route, mqtt_route) -> add_routes([Route]). --spec add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}. +-spec(add_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}). add_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> add_route(#mqtt_route{topic = Topic, node = Node}). %% @doc Add Routes --spec add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}. +-spec(add_routes([mqtt_route()]) -> ok | {errory, Reason :: any()}). add_routes(Routes) -> Add = fun() -> [add_route_(Route) || Route <- Routes] end, case mnesia:transaction(Add) of @@ -112,18 +112,18 @@ add_route_(Route = #mqtt_route{topic = Topic}) -> end. %% @doc Delete Route --spec del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}. +-spec(del_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}). del_route(Topic) when is_binary(Topic) -> del_route(#mqtt_route{topic = Topic, node = node()}); del_route(Route) when is_record(Route, mqtt_route) -> del_routes([Route]). --spec del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}. +-spec(del_route(Topic :: binary(), Node :: node()) -> ok | {error, Reason :: any()}). del_route(Topic, Node) when is_binary(Topic), is_atom(Node) -> del_route(#mqtt_route{topic = Topic, node = Node}). %% @doc Delete Routes --spec del_routes([mqtt_route()]) -> ok | {error, any()}. +-spec(del_routes([mqtt_route()]) -> ok | {error, any()}). del_routes(Routes) -> Del = fun() -> [del_route_(Route) || Route <- Routes] end, case mnesia:transaction(Del) of diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index e119a5029..0fafb14a4 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -35,7 +35,7 @@ %% PubSub API -export([subscribe/1, subscribe/3, publish/1, unsubscribe/1, unsubscribe/3, - update_subscription/4]). + lookup_subscription/1, update_subscription/4]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -63,10 +63,10 @@ mnesia(copy) -> %%-------------------------------------------------------------------- %% @doc Start a Server --spec start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when - Pool :: atom(), - Id :: pos_integer(), - Env :: list(tuple()). +-spec(start_link(Pool, Id, Env) -> {ok, pid()} | ignore | {error, any()} when + Pool :: atom(), + Id :: pos_integer(), + Env :: list(tuple())). start_link(Pool, Id, Env) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). @@ -75,40 +75,48 @@ start_link(Pool, Id, Env) -> %%-------------------------------------------------------------------- %% @doc Subscribe a Topic --spec subscribe(binary()) -> ok. +-spec(subscribe(binary()) -> ok). subscribe(Topic) when is_binary(Topic) -> From = self(), call(server(From), {subscribe, From, Topic}). %% @doc Subscribe from a MQTT session. --spec subscribe(binary(), binary(), mqtt_qos()) -> ok. +-spec(subscribe(binary(), binary(), mqtt_qos()) -> ok). subscribe(ClientId, Topic, Qos) -> From = self(), call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). +%% @doc Lookup subscriptions. +-spec(lookup_subscription(binary()) -> [#mqtt_subscription{}]). +lookup_subscription(ClientId) -> + mnesia:dirty_read(subscription, ClientId). + %% @doc Update a subscription. --spec update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok. +-spec(update_subscription(binary(), binary(), mqtt_qos(), mqtt_qos()) -> ok). update_subscription(ClientId, Topic, OldQos, NewQos) -> call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}). %% @doc Publish a Message --spec publish(Msg :: mqtt_message()) -> ok. +-spec(publish(Msg :: mqtt_message()) -> any()). publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), - Msg1 = #mqtt_message{topic = Topic} - = emqttd_broker:foldl_hooks('message.publish', [], Msg), - %% Retain message first. Don't create retained topic. - Msg2 = case emqttd_retainer:retain(Msg1) of - ok -> emqttd_message:unset_flag(Msg1); - ignore -> Msg1 - end, - emqttd_pubsub:publish(Topic, Msg2). + case emqttd:run_hooks('message.publish', [], Msg) of + {ok, Msg1 = #mqtt_message{topic = Topic}} -> + %% Retain message first. Don't create retained topic. + Msg2 = case emqttd_retainer:retain(Msg1) of + ok -> emqttd_message:unset_flag(Msg1); + ignore -> Msg1 + end, + emqttd_pubsub:publish(Topic, Msg2); + {stop, Msg1} -> + lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]) + end. %% @doc Unsubscribe a Topic --spec unsubscribe(binary()) -> ok. +-spec(unsubscribe(binary()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> From = self(), call(server(From), {unsubscribe, From, Topic}). %% @doc Unsubscribe a Topic from a MQTT session --spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok. +-spec(unsubscribe(binary(), binary(), mqtt_qos()) -> ok). unsubscribe(ClientId, Topic, Qos) -> From = self(), call(server(From), {unsubscribe, From, ClientId, Topic, Qos}). @@ -188,11 +196,11 @@ code_change(_OldVsn, State, _Extra) -> %% @private %% @doc Add a subscription. --spec add_subscription_(binary(), binary(), mqtt_qos()) -> ok. +-spec(add_subscription_(binary(), binary(), mqtt_qos()) -> ok). add_subscription_(ClientId, Topic, Qos) -> add_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). --spec add_subscription_(mqtt_subscription()) -> ok. +-spec(add_subscription_(mqtt_subscription()) -> ok). add_subscription_(Subscription) when is_record(Subscription, mqtt_subscription) -> mnesia:dirty_write(subscription, Subscription). @@ -202,7 +210,7 @@ update_subscription_(OldSub, NewSub) -> %% @private %% @doc Delete a subscription --spec del_subscription_(binary(), binary(), mqtt_qos()) -> ok. +-spec(del_subscription_(binary(), binary(), mqtt_qos()) -> ok). del_subscription_(ClientId, Topic, Qos) -> del_subscription_(#mqtt_subscription{subid = ClientId, topic = Topic, qos = Qos}). @@ -246,7 +254,8 @@ trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) -> %%-------------------------------------------------------------------- set_subscription_stats() -> - emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)). + emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', + mnesia:table_info(subscription, size)). %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 12b3e680d..b95494f8d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -288,49 +288,60 @@ handle_call(Req, _From, State) -> handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> - TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), - ?LOG(info, "Subscribe ~p", [TopicTable], Session), - Subscriptions1 = lists:foldl( - fun({Topic, Qos}, SubDict) -> - case dict:find(Topic, SubDict) of - {ok, Qos} -> - ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), - SubDict; - {ok, OldQos} -> - emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), - ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), - dict:store(Topic, Qos, SubDict); - error -> - emqttd:subscribe(ClientId, Topic, Qos), - %%TODO: the design is ugly... - %% : 3.8.4 - %% Where the Topic Filter is not identical to any existing Subscription’s filter, - %% a new Subscription is created and all matching retained messages are sent. - emqttd_retainer:dispatch(Topic, self()), + case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of + {ok, TopicTable} -> + ?LOG(info, "Subscribe ~p", [TopicTable], Session), + Subscriptions1 = lists:foldl( + fun({Topic, Qos}, SubDict) -> + case dict:find(Topic, SubDict) of + {ok, Qos} -> + ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), + SubDict; + {ok, OldQos} -> + emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), + ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), + dict:store(Topic, Qos, SubDict); + error -> + emqttd:subscribe(ClientId, Topic, Qos), + %%TODO: the design is ugly... + %% : 3.8.4 + %% Where the Topic Filter is not identical to any existing Subscription’s filter, + %% a new Subscription is created and all matching retained messages are sent. + emqttd_retainer:dispatch(Topic, self()), + + dict:store(Topic, Qos, SubDict) + end + end, Subscriptions, TopicTable), + AckFun([Qos || {_, Qos} <- TopicTable]), + emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable), + hibernate(Session#session{subscriptions = Subscriptions1}); + {stop, TopicTable} -> + ?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session), + hibernate(Session) + end; - dict:store(Topic, Qos, SubDict) - end - end, Subscriptions, TopicTable), - AckFun([Qos || {_, Qos} <- TopicTable]), - emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), - hibernate(Session#session{subscriptions = Subscriptions1}); handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> - Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0), - ?LOG(info, "unsubscribe ~p", [Topics], Session), - Subscriptions1 = lists:foldl( - fun(Topic, SubDict) -> - case dict:find(Topic, SubDict) of - {ok, Qos} -> - emqttd:unsubscribe(ClientId, Topic, Qos), - dict:erase(Topic, SubDict); - error -> - SubDict - end - end, Subscriptions, Topics), - hibernate(Session#session{subscriptions = Subscriptions1}); + case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of + {ok, Topics} -> + ?LOG(info, "unsubscribe ~p", [Topics], Session), + Subscriptions1 = lists:foldl( + fun(Topic, SubDict) -> + case dict:find(Topic, SubDict) of + {ok, Qos} -> + emqttd:unsubscribe(ClientId, Topic, Qos), + dict:erase(Topic, SubDict); + error -> + SubDict + end + end, Subscriptions, Topics), + hibernate(Session#session{subscriptions = Subscriptions1}); + {stop, Topics} -> + ?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session), + hibernate(Session) + end; handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> ?LOG(warning, "destroyed", [], Session), @@ -644,7 +655,7 @@ acked(PktId, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> - emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]); + emqttd:run_hooks('message.acked', [ClientId], Msg); false -> ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session) end,