From a16e5279758f5697d7bd22ea225bb5b5c4a95026 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 13 Oct 2015 21:09:13 +0800 Subject: [PATCH] 0.12.1 refactor --- src/emqttd_acl_internal.erl | 1 + src/emqttd_app.erl | 56 +++++++++++++++++------------------ src/emqttd_auth_anonymous.erl | 2 +- src/emqttd_auth_clientid.erl | 15 ++++++---- src/emqttd_auth_ldap.erl | 4 +-- src/emqttd_auth_mod.erl | 6 ++-- src/emqttd_auth_username.erl | 34 ++++++++++++++++----- src/emqttd_bridge.erl | 12 ++++---- src/emqttd_bridge_sup.erl | 10 +++---- src/emqttd_broker.erl | 11 +++---- src/emqttd_cli.erl | 8 +++-- src/emqttd_ctl.erl | 7 +++-- src/emqttd_gen_mod.erl | 4 +-- src/emqttd_http.erl | 15 +++++----- src/emqttd_log.erl | 1 + src/emqttd_mnesia.erl | 2 +- src/emqttd_mod_presence.erl | 6 ++-- src/emqttd_mod_rewrite.erl | 16 +++++----- 18 files changed, 122 insertions(+), 88 deletions(-) diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index b6dc0b81b..c2ce3184c 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_acl_internal). -author("Feng Lee "). diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 366df60cb..fa7799904 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -29,15 +29,13 @@ -author("Feng Lee "). +-include("emqttd_cli.hrl"). + -behaviour(application). %% Application callbacks -export([start/2, stop/1]). --define(PRINT_MSG(Msg), io:format(Msg)). - --define(PRINT(Format, Args), io:format(Format, Args)). - %%%============================================================================= %%% Application callbacks %%%============================================================================= @@ -106,35 +104,35 @@ start_server(Sup, {Name, Server, Opts}) -> start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n"). -start_child(Sup, {supervisor, Name}) -> - supervisor:start_child(Sup, supervisor_spec(Name)); -start_child(Sup, Name) when is_atom(Name) -> - {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)). +start_child(Sup, {supervisor, Module}) -> + supervisor:start_child(Sup, supervisor_spec(Module)); -start_child(Sup, {supervisor, Name}, Opts) -> - supervisor:start_child(Sup, supervisor_spec(Name, Opts)); -start_child(Sup, Name, Opts) when is_atom(Name) -> - {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)). +start_child(Sup, Module) when is_atom(Module) -> + {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Module)). -%%TODO: refactor... -supervisor_spec(Name) -> - {Name, - {Name, start_link, []}, - permanent, infinity, supervisor, [Name]}. +start_child(Sup, {supervisor, Module}, Opts) -> + supervisor:start_child(Sup, supervisor_spec(Module, Opts)); -supervisor_spec(Name, Opts) -> - {Name, - {Name, start_link, [Opts]}, - permanent, infinity, supervisor, [Name]}. +start_child(Sup, Module, Opts) when is_atom(Module) -> + supervisor:start_child(Sup, worker_spec(Module, Opts)). -worker_spec(Name) -> - {Name, - {Name, start_link, []}, - permanent, 10000, worker, [Name]}. -worker_spec(Name, Opts) -> - {Name, - {Name, start_link, [Opts]}, - permanent, 10000, worker, [Name]}. +supervisor_spec(Module) when is_atom(Module) -> + supervisor_spec(Module, start_link, []). + +supervisor_spec(Module, Opts) -> + supervisor_spec(Module, start_link, [Opts]). + +supervisor_spec(M, F, A) -> + {M, {M, F, A}, permanent, infinity, supervisor, [M]}. + +worker_spec(Module) when is_atom(Module) -> + worker_spec(Module, start_link, []). + +worker_spec(Module, Opts) when is_atom(Module) -> + worker_spec(Module, start_link, [Opts]). + +worker_spec(M, F, A) -> + {M, {M, F, A}, permanent, 10000, worker, [M]}. -spec stop(State :: term()) -> term(). stop(_State) -> diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index e410d3459..95592c22b 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -36,5 +36,5 @@ init(Opts) -> {ok, Opts}. check(_Client, _Password, _Opts) -> ok. -description() -> "Anonymous authentication module". +description() -> "Anonymous Authentication Module". diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 53cc5496f..0d28b8421 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% ClientId authentication module. +%%% ClientId Authentication Module. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -51,22 +51,25 @@ %% @doc Add clientid %% @end %%------------------------------------------------------------------------------ +-spec add_clientid(binary()) -> {atomic, ok} | {aborted, any()}. add_clientid(ClientId) when is_binary(ClientId) -> R = #mqtt_auth_clientid{client_id = ClientId}, - mnesia:transaction(fun() -> mnesia:write(R) end). + mnesia:transaction(fun mnesia:write/1, [R]). %%------------------------------------------------------------------------------ %% @doc Add clientid with password %% @end %%------------------------------------------------------------------------------ +-spec add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}. add_clientid(ClientId, Password) -> R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, - mnesia:transaction(fun() -> mnesia:write(R) end). + mnesia:transaction(fun mnesia:write/1, [R]). %%------------------------------------------------------------------------------ %% @doc Lookup clientid %% @end %%------------------------------------------------------------------------------ +-spec lookup_clientid(binary()) -> list(). lookup_clientid(ClientId) -> mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). @@ -74,6 +77,7 @@ lookup_clientid(ClientId) -> %% @doc Lookup all clientids %% @end %%------------------------------------------------------------------------------ +-spec all_clientids() -> list(binary()). all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). @@ -81,8 +85,9 @@ all_clientids() -> %% @doc Remove clientid %% @end %%------------------------------------------------------------------------------ +-spec remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}. remove_clientid(ClientId) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end). + mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]). %%%============================================================================= %%% emqttd_auth_mod callbacks @@ -95,7 +100,7 @@ init(Opts) -> mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), case proplists:get_value(file, Opts) of undefined -> ok; - File -> load(File) + File -> load(File) end, {ok, Opts}. diff --git a/src/emqttd_auth_ldap.erl b/src/emqttd_auth_ldap.erl index 965aaeefe..09ca89142 100644 --- a/src/emqttd_auth_ldap.erl +++ b/src/emqttd_auth_ldap.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% LDAP Authentication Module. +%%% LDAP Authentication Module %%% %%% @end %%%----------------------------------------------------------------------------- @@ -28,7 +28,7 @@ -author("Feng Lee "). --include_lib("emqttd/include/emqttd.hrl"). +-include("emqttd.hrl"). -import(proplists, [get_value/2, get_value/3]). diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index a6298a68b..86e08df4b 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_auth_mod). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -50,9 +50,9 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{init, 1}, {check, 3}, {description, 0}]; + [{init, 1}, {check, 3}, {description, 0}]; behaviour_info(_Other) -> - undefined. + undefined. -endif. diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 39fa08a6a..392d32961 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -20,13 +20,13 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd authentication with username and password. +%%% Authentication with username and password. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth_username). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -65,16 +65,36 @@ cli(_) -> %%% API %%%============================================================================= +%%------------------------------------------------------------------------------ +%% @doc Add user +%% @end +%%------------------------------------------------------------------------------ +-spec add_user(binary(), binary()) -> {atomic, ok} | {aborted, any()}. add_user(Username, Password) -> - R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, - mnesia:transaction(fun() -> mnesia:write(R) end). + User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, + mnesia:transaction(fun mnesia:write/1, [User]). +%%------------------------------------------------------------------------------ +%% @doc Lookup user by username +%% @end +%%------------------------------------------------------------------------------ +-spec lookup_user(binary()) -> list(). lookup_user(Username) -> mnesia:dirty_read(?AUTH_USERNAME_TAB, Username). +%%------------------------------------------------------------------------------ +%% @doc Remove user +%% @end +%%------------------------------------------------------------------------------ +-spec remove_user(binary()) -> {atomic, ok} | {aborted, any()}. remove_user(Username) -> - mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end). + mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}]). +%%------------------------------------------------------------------------------ +%% @doc All usernames +%% @end +%%------------------------------------------------------------------------------ +-spec all_users() -> list(). all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). @@ -104,7 +124,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) -> end end. -description() -> "Username password authentication module". +description() -> + "Username password authentication module". %%%============================================================================= %%% Internal functions @@ -123,4 +144,3 @@ salt() -> Salt = random:uniform(16#ffffffff), <>. - diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index dfb68add3..cb6349803 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -144,12 +144,12 @@ handle_info({nodeup, Node}, State = #state{node = Node}) -> handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> Self = self(), spawn_link(fun() -> - case net_kernel:connect_node(Node) of - true -> %%TODO: this is not right... fixme later - Self ! {nodeup, Node}; - false -> - erlang:send_after(Interval, Self, ping_down_node) - end + case net_kernel:connect_node(Node) of + true -> %%TODO: this is not right... fixme later + Self ! {nodeup, Node}; + false -> + erlang:send_after(Interval, Self, ping_down_node) + end end), {noreply, State}; diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index 132eca200..c780a6f6b 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -38,12 +38,6 @@ -export([init/1]). -%%%============================================================================= -%%% CLI -%%%============================================================================= - - - %%%============================================================================= %%% API %%%============================================================================= @@ -55,6 +49,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +%%------------------------------------------------------------------------------ +%% @doc List all bridges +%% @end +%%------------------------------------------------------------------------------ -spec bridges() -> [{tuple(), pid()}]. bridges() -> [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 19dd6237a..88a3713cc 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -84,6 +84,7 @@ start_link() -> %% @doc Get running nodes %% @end %%------------------------------------------------------------------------------ +-spec running_nodes() -> list(node()). running_nodes() -> mnesia:system_info(running_db_nodes). @@ -109,7 +110,7 @@ notify(EventType, Event) -> %% @end %%------------------------------------------------------------------------------ env(Name) -> - proplists:get_value(Name, application:get_env(emqttd, broker, [])). + proplists:get_value(Name, emqttd:env(broker)). %%------------------------------------------------------------------------------ %% @doc Get broker version @@ -152,7 +153,7 @@ datetime() -> %%------------------------------------------------------------------------------ -spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}. hook(Hook, Name, MFA) -> - gen_server:call(?MODULE, {hook, Hook, Name, MFA}). + gen_server:call(?SERVER, {hook, Hook, Name, MFA}). %%------------------------------------------------------------------------------ %% @doc Unhook @@ -160,7 +161,7 @@ hook(Hook, Name, MFA) -> %%------------------------------------------------------------------------------ -spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}. unhook(Hook, Name) -> - gen_server:call(?MODULE, {unhook, Hook, Name}). + gen_server:call(?SERVER, {unhook, Hook, Name}). %%------------------------------------------------------------------------------ %% @doc Foreach hooks @@ -266,13 +267,13 @@ handle_cast(_Msg, State) -> handle_info(heartbeat, State) -> publish(uptime, list_to_binary(uptime(State))), publish(datetime, list_to_binary(datetime())), - {noreply, State, hibernate}; + {noreply, State}; handle_info(tick, State) -> retain(brokers), retain(version, list_to_binary(version())), retain(sysdescr, list_to_binary(sysdescr())), - {noreply, State, hibernate}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 72f0e580b..fd70ce4f6 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -223,8 +223,10 @@ plugins(["list"]) -> plugins(["load", Name]) -> case emqttd_plugins:load(list_to_atom(Name)) of - {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); - {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason]) + {ok, StartedApps} -> + ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]); + {error, Reason} -> + ?PRINT("load plugin error: ~p~n", [Reason]) end; plugins(["unload", Name]) -> @@ -236,7 +238,7 @@ plugins(["unload", Name]) -> end; plugins(_) -> - ?USAGE([{"plugins list", "query loaded plugins"}, + ?USAGE([{"plugins list", "show loaded plugins"}, {"plugins load ", "load plugin"}, {"plugins unload ", "unload plugin"}]). diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index b02bab7ad..5d34bd449 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -139,7 +139,10 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal Function Definitions %%%============================================================================= -noreply(State) -> {noreply, State, hibernate}. +noreply(State) -> + {noreply, State, hibernate}. + +next_seq(State = #state{seq = Seq}) -> + State#state{seq = Seq + 1}. -next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}. diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 190971d0e..682b9ab05 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -41,9 +41,9 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{load, 1}, {unload, 1}]; + [{load, 1}, {unload, 1}]; behaviour_info(_Other) -> - undefined. + undefined. -endif. diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 946ad0b65..7f4251cd9 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -30,6 +30,7 @@ -author("Feng Lee "). -include("emqttd.hrl"). + -include("emqttd_protocol.hrl"). -import(proplists, [get_value/2, get_value/3]). @@ -46,7 +47,7 @@ handle_request('GET', "/mqtt/status", Req) -> false -> not_running; {value, _Ver} -> running end, - Status = io_lib:format("Node ~s is ~s~nemqttd is ~s~n", + Status = io_lib:format("Node ~s is ~s~nemqttd is ~s", [node(), InternalStatus, AppStatus]), Req:ok({"text/plain", iolist_to_binary(Status)}); @@ -59,15 +60,15 @@ handle_request('POST', "/mqtt/publish", Req) -> case authorized(Req) of true -> ClientId = get_value("client", Params, http), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Topic = list_to_binary(get_value("topic", Params)), - Payload = list_to_binary(get_value("message", Params)), + Qos = int(get_value("qos", Params, "0")), + Retain = bool(get_value("retain", Params, "0")), + Topic = list_to_binary(get_value("topic", Params)), + Payload = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), - Req:ok({"text/plain", <<"ok\n">>}); + Req:ok({"text/plain", <<"ok">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); {_, false} -> @@ -83,7 +84,7 @@ handle_request('POST', "/mqtt/publish", Req) -> handle_request('GET', "/mqtt", Req) -> lager:info("Websocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), - Proto = Req:get_header_value("Sec-WebSocket-Protocol"), + Proto = Req:get_header_value("Sec-WebSocket-Protocol"), case {is_websocket(Upgrade), Proto} of {true, "mqtt" ++ _Vsn} -> emqttd_ws_client:start_link(Req); diff --git a/src/emqttd_log.erl b/src/emqttd_log.erl index 96522ae9d..4be2f2999 100644 --- a/src/emqttd_log.erl +++ b/src/emqttd_log.erl @@ -30,3 +30,4 @@ -module(emqttd_log). +%%TODO: Hooks to log??? diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 355f3d547..3f94071ed 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -27,7 +27,7 @@ -module(emqttd_mnesia). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index b534695e0..da3a05c00 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -38,8 +38,10 @@ -export([client_connected/3, client_disconnected/3]). load(Opts) -> - emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), - emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, + {?MODULE, client_connected, [Opts]}), + emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, + {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. client_connected(ConnAck, #mqtt_client{client_id = ClientId, diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index b15a0c700..09357211c 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -46,11 +46,11 @@ load(Opts) -> {ok, Terms} = file:consult(File), Sections = compile(Terms), emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, - {?MODULE, rewrite, [subscribe, Sections]}), + {?MODULE, rewrite, [subscribe, Sections]}), emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, - {?MODULE, rewrite, [unsubscribe, Sections]}), + {?MODULE, rewrite, [unsubscribe, Sections]}), emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish}, - {?MODULE, rewrite, [publish, Sections]}). + {?MODULE, rewrite, [publish, Sections]}). rewrite(_ClientId, TopicTable, subscribe, Sections) -> lager:info("rewrite subscribe: ~p", [TopicTable]), @@ -83,9 +83,9 @@ reload(File) -> end. unload(_) -> - emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), - emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). + emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}). %%%============================================================================= %%% Internal functions @@ -116,7 +116,8 @@ match_rule(Topic, []) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> - Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), + Vars = lists:zip(["\\$" ++ integer_to_list(I) + || I <- lists:seq(1, length(Captured))], Captured), iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) @@ -124,3 +125,4 @@ match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> nomatch -> match_rule(Topic, Rules) end. +