0.12.1 refactor

This commit is contained in:
Feng 2015-10-13 21:09:13 +08:00
parent 05ff1ab002
commit a16e527975
18 changed files with 122 additions and 88 deletions

View File

@ -24,6 +24,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_acl_internal). -module(emqttd_acl_internal).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").

View File

@ -29,15 +29,13 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd_cli.hrl").
-behaviour(application). -behaviour(application).
%% Application callbacks %% Application callbacks
-export([start/2, stop/1]). -export([start/2, stop/1]).
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
%%%============================================================================= %%%=============================================================================
%%% Application callbacks %%% Application callbacks
%%%============================================================================= %%%=============================================================================
@ -106,35 +104,35 @@ start_server(Sup, {Name, Server, Opts}) ->
start_child(Sup, Server, Opts), start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n"). ?PRINT_MSG("[done]~n").
start_child(Sup, {supervisor, Name}) -> start_child(Sup, {supervisor, Module}) ->
supervisor:start_child(Sup, supervisor_spec(Name)); supervisor:start_child(Sup, supervisor_spec(Module));
start_child(Sup, Name) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)).
start_child(Sup, {supervisor, Name}, Opts) -> start_child(Sup, Module) when is_atom(Module) ->
supervisor:start_child(Sup, supervisor_spec(Name, Opts)); {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Module)).
start_child(Sup, Name, Opts) when is_atom(Name) ->
{ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)).
%%TODO: refactor... start_child(Sup, {supervisor, Module}, Opts) ->
supervisor_spec(Name) -> supervisor:start_child(Sup, supervisor_spec(Module, Opts));
{Name,
{Name, start_link, []},
permanent, infinity, supervisor, [Name]}.
supervisor_spec(Name, Opts) -> start_child(Sup, Module, Opts) when is_atom(Module) ->
{Name, supervisor:start_child(Sup, worker_spec(Module, Opts)).
{Name, start_link, [Opts]},
permanent, infinity, supervisor, [Name]}.
worker_spec(Name) -> supervisor_spec(Module) when is_atom(Module) ->
{Name, supervisor_spec(Module, start_link, []).
{Name, start_link, []},
permanent, 10000, worker, [Name]}. supervisor_spec(Module, Opts) ->
worker_spec(Name, Opts) -> supervisor_spec(Module, start_link, [Opts]).
{Name,
{Name, start_link, [Opts]}, supervisor_spec(M, F, A) ->
permanent, 10000, worker, [Name]}. {M, {M, F, A}, permanent, infinity, supervisor, [M]}.
worker_spec(Module) when is_atom(Module) ->
worker_spec(Module, start_link, []).
worker_spec(Module, Opts) when is_atom(Module) ->
worker_spec(Module, start_link, [Opts]).
worker_spec(M, F, A) ->
{M, {M, F, A}, permanent, 10000, worker, [M]}.
-spec stop(State :: term()) -> term(). -spec stop(State :: term()) -> term().
stop(_State) -> stop(_State) ->

View File

@ -36,5 +36,5 @@ init(Opts) -> {ok, Opts}.
check(_Client, _Password, _Opts) -> ok. check(_Client, _Password, _Opts) -> ok.
description() -> "Anonymous authentication module". description() -> "Anonymous Authentication Module".

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% ClientId authentication module. %%% ClientId Authentication Module.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -51,22 +51,25 @@
%% @doc Add clientid %% @doc Add clientid
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec add_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
add_clientid(ClientId) when is_binary(ClientId) -> add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{client_id = ClientId}, R = #mqtt_auth_clientid{client_id = ClientId},
mnesia:transaction(fun() -> mnesia:write(R) end). mnesia:transaction(fun mnesia:write/1, [R]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Add clientid with password %% @doc Add clientid with password
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}.
add_clientid(ClientId, Password) -> add_clientid(ClientId, Password) ->
R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
mnesia:transaction(fun() -> mnesia:write(R) end). mnesia:transaction(fun mnesia:write/1, [R]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup clientid %% @doc Lookup clientid
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup_clientid(binary()) -> list().
lookup_clientid(ClientId) -> lookup_clientid(ClientId) ->
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
@ -74,6 +77,7 @@ lookup_clientid(ClientId) ->
%% @doc Lookup all clientids %% @doc Lookup all clientids
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec all_clientids() -> list(binary()).
all_clientids() -> all_clientids() ->
mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
@ -81,8 +85,9 @@ all_clientids() ->
%% @doc Remove clientid %% @doc Remove clientid
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
remove_clientid(ClientId) -> remove_clientid(ClientId) ->
mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end). mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]).
%%%============================================================================= %%%=============================================================================
%%% emqttd_auth_mod callbacks %%% emqttd_auth_mod callbacks
@ -95,7 +100,7 @@ init(Opts) ->
mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
case proplists:get_value(file, Opts) of case proplists:get_value(file, Opts) of
undefined -> ok; undefined -> ok;
File -> load(File) File -> load(File)
end, end,
{ok, Opts}. {ok, Opts}.

View File

@ -20,7 +20,7 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% LDAP Authentication Module. %%% LDAP Authentication Module
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -28,7 +28,7 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include_lib("emqttd/include/emqttd.hrl"). -include("emqttd.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).

View File

@ -26,7 +26,7 @@
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_auth_mod). -module(emqttd_auth_mod).
-author('feng@emqtt.io'). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
@ -50,9 +50,9 @@
-export([behaviour_info/1]). -export([behaviour_info/1]).
behaviour_info(callbacks) -> behaviour_info(callbacks) ->
[{init, 1}, {check, 3}, {description, 0}]; [{init, 1}, {check, 3}, {description, 0}];
behaviour_info(_Other) -> behaviour_info(_Other) ->
undefined. undefined.
-endif. -endif.

View File

@ -20,13 +20,13 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd authentication with username and password. %%% Authentication with username and password.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_auth_username). -module(emqttd_auth_username).
-author('feng@emqtt.io'). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
@ -65,16 +65,36 @@ cli(_) ->
%%% API %%% API
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Add user
%% @end
%%------------------------------------------------------------------------------
-spec add_user(binary(), binary()) -> {atomic, ok} | {aborted, any()}.
add_user(Username, Password) -> add_user(Username, Password) ->
R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
mnesia:transaction(fun() -> mnesia:write(R) end). mnesia:transaction(fun mnesia:write/1, [User]).
%%------------------------------------------------------------------------------
%% @doc Lookup user by username
%% @end
%%------------------------------------------------------------------------------
-spec lookup_user(binary()) -> list().
lookup_user(Username) -> lookup_user(Username) ->
mnesia:dirty_read(?AUTH_USERNAME_TAB, Username). mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
%%------------------------------------------------------------------------------
%% @doc Remove user
%% @end
%%------------------------------------------------------------------------------
-spec remove_user(binary()) -> {atomic, ok} | {aborted, any()}.
remove_user(Username) -> remove_user(Username) ->
mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end). mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}]).
%%------------------------------------------------------------------------------
%% @doc All usernames
%% @end
%%------------------------------------------------------------------------------
-spec all_users() -> list().
all_users() -> all_users() ->
mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
@ -104,7 +124,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) ->
end end
end. end.
description() -> "Username password authentication module". description() ->
"Username password authentication module".
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Internal functions
@ -123,4 +144,3 @@ salt() ->
Salt = random:uniform(16#ffffffff), Salt = random:uniform(16#ffffffff),
<<Salt:32>>. <<Salt:32>>.

View File

@ -144,12 +144,12 @@ handle_info({nodeup, Node}, State = #state{node = Node}) ->
handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) ->
Self = self(), Self = self(),
spawn_link(fun() -> spawn_link(fun() ->
case net_kernel:connect_node(Node) of case net_kernel:connect_node(Node) of
true -> %%TODO: this is not right... fixme later true -> %%TODO: this is not right... fixme later
Self ! {nodeup, Node}; Self ! {nodeup, Node};
false -> false ->
erlang:send_after(Interval, Self, ping_down_node) erlang:send_after(Interval, Self, ping_down_node)
end end
end), end),
{noreply, State}; {noreply, State};

View File

@ -38,12 +38,6 @@
-export([init/1]). -export([init/1]).
%%%=============================================================================
%%% CLI
%%%=============================================================================
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================
@ -55,6 +49,10 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%------------------------------------------------------------------------------
%% @doc List all bridges
%% @end
%%------------------------------------------------------------------------------
-spec bridges() -> [{tuple(), pid()}]. -spec bridges() -> [{tuple(), pid()}].
bridges() -> bridges() ->
[{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _}

View File

@ -84,6 +84,7 @@ start_link() ->
%% @doc Get running nodes %% @doc Get running nodes
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec running_nodes() -> list(node()).
running_nodes() -> running_nodes() ->
mnesia:system_info(running_db_nodes). mnesia:system_info(running_db_nodes).
@ -109,7 +110,7 @@ notify(EventType, Event) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
env(Name) -> env(Name) ->
proplists:get_value(Name, application:get_env(emqttd, broker, [])). proplists:get_value(Name, emqttd:env(broker)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Get broker version %% @doc Get broker version
@ -152,7 +153,7 @@ datetime() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. -spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
hook(Hook, Name, MFA) -> hook(Hook, Name, MFA) ->
gen_server:call(?MODULE, {hook, Hook, Name, MFA}). gen_server:call(?SERVER, {hook, Hook, Name, MFA}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Unhook %% @doc Unhook
@ -160,7 +161,7 @@ hook(Hook, Name, MFA) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. -spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
unhook(Hook, Name) -> unhook(Hook, Name) ->
gen_server:call(?MODULE, {unhook, Hook, Name}). gen_server:call(?SERVER, {unhook, Hook, Name}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Foreach hooks %% @doc Foreach hooks
@ -266,13 +267,13 @@ handle_cast(_Msg, State) ->
handle_info(heartbeat, State) -> handle_info(heartbeat, State) ->
publish(uptime, list_to_binary(uptime(State))), publish(uptime, list_to_binary(uptime(State))),
publish(datetime, list_to_binary(datetime())), publish(datetime, list_to_binary(datetime())),
{noreply, State, hibernate}; {noreply, State};
handle_info(tick, State) -> handle_info(tick, State) ->
retain(brokers), retain(brokers),
retain(version, list_to_binary(version())), retain(version, list_to_binary(version())),
retain(sysdescr, list_to_binary(sysdescr())), retain(sysdescr, list_to_binary(sysdescr())),
{noreply, State, hibernate}; {noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.

View File

@ -223,8 +223,10 @@ plugins(["list"]) ->
plugins(["load", Name]) -> plugins(["load", Name]) ->
case emqttd_plugins:load(list_to_atom(Name)) of case emqttd_plugins:load(list_to_atom(Name)) of
{ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); {ok, StartedApps} ->
{error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
{error, Reason} ->
?PRINT("load plugin error: ~p~n", [Reason])
end; end;
plugins(["unload", Name]) -> plugins(["unload", Name]) ->
@ -236,7 +238,7 @@ plugins(["unload", Name]) ->
end; end;
plugins(_) -> plugins(_) ->
?USAGE([{"plugins list", "query loaded plugins"}, ?USAGE([{"plugins list", "show loaded plugins"},
{"plugins load <Plugin>", "load plugin"}, {"plugins load <Plugin>", "load plugin"},
{"plugins unload <Plugin>", "unload plugin"}]). {"plugins unload <Plugin>", "unload plugin"}]).

View File

@ -139,7 +139,10 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal Function Definitions %%% Internal Function Definitions
%%%============================================================================= %%%=============================================================================
noreply(State) -> {noreply, State, hibernate}. noreply(State) ->
{noreply, State, hibernate}.
next_seq(State = #state{seq = Seq}) ->
State#state{seq = Seq + 1}.
next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}.

View File

@ -41,9 +41,9 @@
-export([behaviour_info/1]). -export([behaviour_info/1]).
behaviour_info(callbacks) -> behaviour_info(callbacks) ->
[{load, 1}, {unload, 1}]; [{load, 1}, {unload, 1}];
behaviour_info(_Other) -> behaviour_info(_Other) ->
undefined. undefined.
-endif. -endif.

View File

@ -30,6 +30,7 @@
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
@ -46,7 +47,7 @@ handle_request('GET', "/mqtt/status", Req) ->
false -> not_running; false -> not_running;
{value, _Ver} -> running {value, _Ver} -> running
end, end,
Status = io_lib:format("Node ~s is ~s~nemqttd is ~s~n", Status = io_lib:format("Node ~s is ~s~nemqttd is ~s",
[node(), InternalStatus, AppStatus]), [node(), InternalStatus, AppStatus]),
Req:ok({"text/plain", iolist_to_binary(Status)}); Req:ok({"text/plain", iolist_to_binary(Status)});
@ -59,15 +60,15 @@ handle_request('POST', "/mqtt/publish", Req) ->
case authorized(Req) of case authorized(Req) of
true -> true ->
ClientId = get_value("client", Params, http), ClientId = get_value("client", Params, http),
Qos = int(get_value("qos", Params, "0")), Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")), Retain = bool(get_value("retain", Params, "0")),
Topic = list_to_binary(get_value("topic", Params)), Topic = list_to_binary(get_value("topic", Params)),
Payload = list_to_binary(get_value("message", Params)), Payload = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} -> {true, true} ->
Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}),
Req:ok({"text/plain", <<"ok\n">>}); Req:ok({"text/plain", <<"ok">>});
{false, _} -> {false, _} ->
Req:respond({400, [], <<"Bad QoS">>}); Req:respond({400, [], <<"Bad QoS">>});
{_, false} -> {_, false} ->
@ -83,7 +84,7 @@ handle_request('POST', "/mqtt/publish", Req) ->
handle_request('GET', "/mqtt", Req) -> handle_request('GET', "/mqtt", Req) ->
lager:info("Websocket Connection from: ~s", [Req:get(peer)]), lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"), Upgrade = Req:get_header_value("Upgrade"),
Proto = Req:get_header_value("Sec-WebSocket-Protocol"), Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
case {is_websocket(Upgrade), Proto} of case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} -> {true, "mqtt" ++ _Vsn} ->
emqttd_ws_client:start_link(Req); emqttd_ws_client:start_link(Req);

View File

@ -30,3 +30,4 @@
-module(emqttd_log). -module(emqttd_log).
%%TODO: Hooks to log???

View File

@ -27,7 +27,7 @@
-module(emqttd_mnesia). -module(emqttd_mnesia).
-author('feng@emqtt.io'). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").

View File

@ -38,8 +38,10 @@
-export([client_connected/3, client_disconnected/3]). -export([client_connected/3, client_disconnected/3]).
load(Opts) -> load(Opts) ->
emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), emqttd_broker:hook('client.connected', {?MODULE, client_connected},
emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), {?MODULE, client_connected, [Opts]}),
emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected},
{?MODULE, client_disconnected, [Opts]}),
{ok, Opts}. {ok, Opts}.
client_connected(ConnAck, #mqtt_client{client_id = ClientId, client_connected(ConnAck, #mqtt_client{client_id = ClientId,

View File

@ -46,11 +46,11 @@ load(Opts) ->
{ok, Terms} = file:consult(File), {ok, Terms} = file:consult(File),
Sections = compile(Terms), Sections = compile(Terms),
emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}), {?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite, [unsubscribe, Sections]}), {?MODULE, rewrite, [unsubscribe, Sections]}),
emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish}, emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish},
{?MODULE, rewrite, [publish, Sections]}). {?MODULE, rewrite, [publish, Sections]}).
rewrite(_ClientId, TopicTable, subscribe, Sections) -> rewrite(_ClientId, TopicTable, subscribe, Sections) ->
lager:info("rewrite subscribe: ~p", [TopicTable]), lager:info("rewrite subscribe: ~p", [TopicTable]),
@ -83,9 +83,9 @@ reload(File) ->
end. end.
unload(_) -> unload(_) ->
emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}),
emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}),
emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}).
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Internal functions
@ -116,7 +116,8 @@ match_rule(Topic, []) ->
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} -> {match, Captured} ->
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), Vars = lists:zip(["\\$" ++ integer_to_list(I)
|| I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl( iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) -> fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global]) re:replace(Acc, Var, Val, [global])
@ -124,3 +125,4 @@ match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
nomatch -> nomatch ->
match_rule(Topic, Rules) match_rule(Topic, Rules)
end. end.