From c5bb1e031e8b99d519bee871e83badec2bcf20f5 Mon Sep 17 00:00:00 2001 From: huangdan Date: Wed, 21 Sep 2016 11:27:39 +0800 Subject: [PATCH 01/13] cluster ct --- test/emqttd_SUITE.erl | 113 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 5444a4f06..41ad9f60e 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -37,6 +37,7 @@ all() -> {group, stats}, {group, hook}, {group, http}, + {group, cluster}, %%{group, backend}, {group, cli}]. @@ -70,6 +71,14 @@ groups() -> [request_status, request_publish ]}, + {cluster, [sequence], + [cluster_test, + cluster_join, + cluster_leave, + cluster_remove, + cluster_remove2, + cluster_node_down + ]}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -378,6 +387,81 @@ auth_header_(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User,":",Pass])), {"Authorization","Basic " ++ Encoded}. +%%-------------------------------------------------------------------- +%% cluster group +%%-------------------------------------------------------------------- +cluster_test(_Config) -> + Z = slave(emqttd, cluster_test_z), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + ok = rpc:call(Z, emqttd_cluster, join, [Node]), + [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ct:log("Z:~p, Node:~p", [Z, Node]), + ok = rpc:call(Z, emqttd_cluster, leave, []), + [Node] = lists:sort(mnesia:system_info(running_db_nodes)), + ok = slave:stop(Z). + +cluster_join(_) -> + Z = slave(emqttd, cluster_join_z), + N = slave(node, cluster_join_n), + wait_running(Z), + true = emqttd:is_running(Z), + Node = node(), + {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node), + {error, {node_not_running, N}} = emqttd_cluster:join(N), + ok = emqttd_cluster:join(Z), + slave:stop(Z), + slave:stop(N). + +cluster_leave(_) -> + Z = slave(emqttd, cluster_leave_z), + wait_running(Z), + {error, node_not_in_cluster} = emqttd_cluster:leave(), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:leave(), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove(_) -> + Z = slave(emqttd, cluster_remove_z), + wait_running(Z), + Node = node(), + {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node), + ok = emqttd_cluster:join(Z), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_remove2(_) -> + Z = slave(emqttd, cluster_remove2_z), + wait_running(Z), + ok = emqttd_cluster:join(Z), + Node = node(), + [Z, Node] = emqttd_mnesia:running_nodes(), + ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []), + ok = emqttd_cluster:remove(Z), + [Node] = emqttd_mnesia:running_nodes(), + slave:stop(Z). + +cluster_node_down(_) -> + Z = slave(emqttd, cluster_node_down), + timer:sleep(1000), + wait_running(Z), + ok = emqttd_cluster:join(Z), + ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]), + ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), + ct:log("Routes: ~p~n", [Routes]), + [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes], + slave:stop(Z), + timer:sleep(1000), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)). + + %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- @@ -451,3 +535,32 @@ cli_vm(_) -> emqttd_cli:vm([]), emqttd_cli:vm(["ports"]). + +ensure_ok(ok) -> ok; +ensure_ok({error, {already_started, _}}) -> ok. + +host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +wait_running(Node) -> + wait_running(Node, 30000). + +wait_running(Node, Timeout) when Timeout < 0 -> + throw({wait_timeout, Node}); + +wait_running(Node, Timeout) -> + case rpc:call(Node, emqttd, is_running, [Node]) of + true -> ok; + false -> timer:sleep(100), + wait_running(Node, Timeout - 100) + end. + +slave(emqttd, Node) -> + {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + rpc:call(Emq, application, ensure_all_started, [emqttd]), + Emq; + +slave(node, Node) -> + {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), + N. + + From 66ca65733a7a3314bbb2fd56d5b66a56ec53b8db Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 23 Sep 2016 15:29:12 +0800 Subject: [PATCH 02/13] emqttd:run_hooks/2, emqttd_hook:run/2 --- src/emqttd.erl | 6 +++++- src/emqttd_hook.erl | 20 ++++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/emqttd.erl b/src/emqttd.erl index 9c5d652a9..07464ee56 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -33,7 +33,7 @@ is_subscribed/2, subscriber_down/1]). %% Hooks API --export([hook/4, hook/3, unhook/2, run_hooks/3]). +-export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). %% Debug API -export([dump/0]). @@ -151,6 +151,10 @@ hook(Hook, Function, InitArgs, Priority) -> unhook(Hook, Function) -> emqttd_hook:delete(Hook, Function). +-spec(run_hooks(atom(), list(any())) -> ok | stop). +run_hooks(Hook, Args) -> + emqttd_hook:run(Hook, Args). + -spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). diff --git a/src/emqttd_hook.erl b/src/emqttd_hook.erl index f183c98d3..675698688 100644 --- a/src/emqttd_hook.erl +++ b/src/emqttd_hook.erl @@ -24,7 +24,7 @@ -export([start_link/0]). %% Hooks API --export([add/3, add/4, delete/2, run/3, lookup/1]). +-export([add/3, add/4, delete/2, run/2, run/3, lookup/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -63,10 +63,26 @@ add(HookPoint, Function, InitArgs, Priority) -> delete(HookPoint, Function) -> gen_server:call(?MODULE, {delete, HookPoint, Function}). --spec(run(atom(), list(any()), any()) -> any()). +%% @doc Run hooks without Acc. +-spec(run(atom(), list(Arg :: any())) -> ok | stop). +run(HookPoint, Args) -> + run_(lookup(HookPoint), Args). + +-spec(run(atom(), list(Arg :: any()), any()) -> any()). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). +%% @private +run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) -> + case apply(Fun, lists:append([Args, InitArgs])) of + ok -> run_(Callbacks, Args); + stop -> stop; + _Any -> run_(Callbacks, Args) + end; + +run_([], _Args) -> + ok. + %% @private run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -> case apply(Fun, lists:append([Args, [Acc], InitArgs])) of From 0063895706cdfbc8923a1e7f3edf87c5894623b3 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 23 Sep 2016 15:29:39 +0800 Subject: [PATCH 03/13] test emqttd:run_hooks/2 --- test/emqttd_SUITE.erl | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 41ad9f60e..0c20b9bd5 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -332,11 +332,16 @@ add_delete_hook(_) -> [] = emqttd_hook:lookup(emqttd_hook). run_hooks(_) -> - emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]), - emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]), - emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []), - {ok, []} = emqttd:run_hooks(unknown_hook, [], []). + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), + emqttd:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqttd:run_hooks(foldl_hook, [arg1, arg2], []), + {ok, []} = emqttd:run_hooks(unknown_hook, [], []), + + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), + emqttd:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), + stop = emqttd:run_hooks(foreach_hook, [arg]). hook_fun1([]) -> ok. hook_fun2([]) -> {ok, []}. @@ -345,6 +350,10 @@ hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. +hook_fun6(arg, initArg) -> ok. +hook_fun7(arg, initArg) -> any. +hook_fun8(arg, initArg) -> stop. + %%-------------------------------------------------------------------- %% HTTP Request Test %%-------------------------------------------------------------------- From e3e00a99b12b2557de5331dfc822c303cf9a5241 Mon Sep 17 00:00:00 2001 From: huangdan Date: Sun, 25 Sep 2016 07:40:36 +0800 Subject: [PATCH 04/13] alarm test --- test/emqttd_SUITE.erl | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 41ad9f60e..c56068885 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -39,6 +39,7 @@ all() -> {group, http}, {group, cluster}, %%{group, backend}, + {group, alarms}, {group, cli}]. groups() -> @@ -70,6 +71,7 @@ groups() -> {http, [sequence], [request_status, request_publish + % websocket_test ]}, {cluster, [sequence], [cluster_test, @@ -79,6 +81,9 @@ groups() -> cluster_remove2, cluster_node_down ]}, + {alarms, [sequence], + [set_alarms] + }, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -387,6 +392,18 @@ auth_header_(User, Pass) -> Encoded = base64:encode_to_string(lists:append([User,":",Pass])), {"Authorization","Basic " ++ Encoded}. +websocket_test(_) -> +% Conn = esockd_connection:new(esockd_transport, nil, []), +% Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, +% mochiweb_headers:make([{"Sec-WebSocket-Protocol","mqtt"}, +% {"Upgrade","websocket"} +% ])), + Req = "GET " ++ "/mqtt" ++" HTTP/1.1\r\nUpgrade: WebSocket\r\nConnection: Upgrade\r\n" ++ + "Host: " ++ "127.0.0.1"++ "\r\n" ++ + "Origin: http://" ++ "127.0.0.1" ++ "/\r\n\r\n", + + ct:log("Req:~p", [Req]), + emqttd_http:handle_request(Req). %%-------------------------------------------------------------------- %% cluster group %%-------------------------------------------------------------------- @@ -461,7 +478,16 @@ cluster_node_down(_) -> timer:sleep(1000), Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)). - +set_alarms(_) -> + AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, + emqttd_alarm:set_alarm(AlarmTest), + Alarms = emqttd_alarm:get_alarms(), + ?assertEqual(1, length(Alarms)), + emqttd_alarm:clear_alarm(<<"1">>), + [] = emqttd_alarm:get_alarms(). + + + %%-------------------------------------------------------------------- %% Cli group %%-------------------------------------------------------------------- From d0218deb88922dfd1733d06793750b1b6740efb4 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 28 Sep 2016 22:43:58 +0800 Subject: [PATCH 05/13] Add hooks: session.created, session.terminated --- src/emqttd_session.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 85701e249..b6276ab85 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -235,6 +235,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), + emqttd:run_hooks('session.created', [ClientId, Username]), %% Start statistics {ok, start_collector(Session), hibernate}. @@ -519,7 +520,8 @@ handle_info(expired, Session) -> handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). -terminate(_Reason, #session{client_id = ClientId}) -> +terminate(Reason, #session{client_id = ClientId, username = Username}) -> + emqttd:run_hooks('session.terminated', [ClientId, Username, Reason]), emqttd:subscriber_down(ClientId), emqttd_sm:unreg_session(ClientId). From 8d48e8d2e23442e5cf29b9cf7deef8b74ce60da5 Mon Sep 17 00:00:00 2001 From: Feng Date: Wed, 28 Sep 2016 22:45:20 +0800 Subject: [PATCH 06/13] Support 'is_superuser/2 callback --- src/emqttd_acl_mod.erl | 14 +++++++------- src/emqttd_auth_anonymous.erl | 4 +++- src/emqttd_auth_clientid.erl | 4 +++- src/emqttd_auth_mod.erl | 19 ++++++++++--------- src/emqttd_auth_username.erl | 4 +++- src/emqttd_protocol.erl | 33 +++++++++++++++++++-------------- 6 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index dfec11157..2eb09a0fe 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -24,16 +24,16 @@ -ifdef(use_specs). --callback init(AclOpts :: list()) -> {ok, State :: any()}. +-callback(init(AclOpts :: list()) -> {ok, State :: any()}). --callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when - Client :: mqtt_client(), - PubSub :: pubsub(), - Topic :: binary(). +-callback(check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when + Client :: mqtt_client(), + PubSub :: pubsub(), + Topic :: binary()). --callback reload_acl(State :: any()) -> ok | {error, any()}. +-callback(reload_acl(State :: any()) -> ok | {error, any()}). --callback description() -> string(). +-callback(description() -> string()). -else. diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index 8acdb7bf0..ed81492d1 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -19,11 +19,13 @@ -behaviour(emqttd_auth_mod). --export([init/1, check/3, description/0]). +-export([init/1, check/3, is_superuser/2, description/0]). init(Opts) -> {ok, Opts}. check(_Client, _Password, _Opts) -> ok. +is_superuser(_Client, _Opts) -> false. + description() -> "Anonymous Authentication Module". diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 15a751ea8..168fca2e5 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -24,7 +24,7 @@ -behaviour(emqttd_auth_mod). %% emqttd_auth_mod callbacks --export([init/1, check/3, description/0]). +-export([init/1, check/3, is_superuser/2, description/0]). -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). @@ -88,6 +88,8 @@ check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) -> _ -> {error, password_error} end. +is_superuser(_Client, _Opts) -> false. + description() -> "ClientId authentication module". %%-------------------------------------------------------------------- diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 09438703d..f05db3a35 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -14,14 +14,13 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Authentication Behaviour. -module(emqttd_auth_mod). -include("emqttd.hrl"). -export([passwd_hash/2]). --type hash_type() :: plain | md5 | sha | sha256. +-type(hash_type() :: plain | md5 | sha | sha256). %%-------------------------------------------------------------------- %% Authentication behavihour @@ -29,21 +28,23 @@ -ifdef(use_specs). --callback init(AuthOpts :: list()) -> {ok, State :: any()}. +-callback(init(AuthOpts :: list()) -> {ok, State :: any()}). --callback check(Client, Password, State) -> ok | ignore | {error, string()} when - Client :: mqtt_client(), - Password :: binary(), - State :: any(). +-callback(check(Client, Password, State) -> ok | ignore | {error, string()} when + Client :: mqtt_client(), + Password :: binary(), + State :: any()). --callback description() -> string(). +-callback(is_superuser(Client :: mqtt_client(), State :: any()) -> boolean()). + +-callback(description() -> string()). -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{init, 1}, {check, 3}, {description, 0}]; + [{init, 1}, {check, 3}, {is_superuser, 2}, {description, 0}]; behaviour_info(_Other) -> undefined. diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 6a0b1d17f..d30f39e31 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -31,7 +31,7 @@ -export([add_user/2, remove_user/1, lookup_user/1, all_users/0]). %% emqttd_auth callbacks --export([init/1, check/3, description/0]). +-export([init/1, check/3, is_superuser/2, description/0]). -define(AUTH_USERNAME_TAB, mqtt_auth_username). @@ -146,6 +146,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) -> end end. +is_superuser(_Client, _Opts) -> false. + description() -> "Username password authentication module". diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 81593a7e5..007e570a1 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -34,7 +34,7 @@ %% Protocol State -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, - proto_ver, proto_name, username, + proto_ver, proto_name, username, is_superuser = false, will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN, session, ws_initial_headers, %% Headers from first HTTP request for websocket client connected_at}). @@ -159,8 +159,12 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> - case emqttd_access_control:auth(client(State1), Password) of + Client = client(State1), + case emqttd_access_control:auth(Client, Password) of ok -> + %% Is Superuser? + IsSuperuser = emqttd_access_control:is_superuser(Client), + %% Generate clientId if null State2 = maybe_set_clientid(State1), @@ -172,7 +176,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Start keepalive start_keepalive(KeepAlive), %% ACCEPT - {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}}; + {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> exit({shutdown, Error}) end; @@ -188,12 +192,10 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Send connack send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3); -process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> - case check_acl(publish, Topic, client(State)) of - allow -> - publish(Packet, State); - deny -> - ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State) +process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) -> + case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of + true -> publish(Packet, State); + false -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State) end, {ok, State}; @@ -216,11 +218,14 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{ - client_id = ClientId, username = Username, session = Session}) -> - Client = client(State), - TopicTable = parse_topic_table(RawTopicTable), - AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable], +%% TODO: refactor later... +process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session, + client_id = ClientId, username = Username, is_superuser = IsSuperuser}) -> + Client = client(State), TopicTable = parse_topic_table(RawTopicTable), + AllowDenies = if + IsSuperuser -> []; + true -> [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable] + end, case lists:member(deny, AllowDenies) of true -> ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), From f510ab894fc018d83fa18d1041b0c19f8a9d201c Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 30 Sep 2016 10:23:04 +0800 Subject: [PATCH 07/13] remove is_superuser/2 callback --- src/emqttd_access_control.erl | 1 + src/emqttd_auth_anonymous.erl | 4 +--- src/emqttd_auth_clientid.erl | 4 +--- src/emqttd_auth_mod.erl | 6 ++---- src/emqttd_auth_username.erl | 6 ++---- src/emqttd_protocol.erl | 25 ++++++++++++++++++------- 6 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index c2c8fb9b3..fb36892c7 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -60,6 +60,7 @@ auth(_Client, _Password, []) -> auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> case catch Mod:check(Client, Password, State) of ok -> ok; + {ok, IsSuper} -> {ok, IsSuper}; ignore -> auth(Client, Password, Mods); {error, Reason} -> {error, Reason}; {'EXIT', Error} -> {error, Error} diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl index ed81492d1..8acdb7bf0 100644 --- a/src/emqttd_auth_anonymous.erl +++ b/src/emqttd_auth_anonymous.erl @@ -19,13 +19,11 @@ -behaviour(emqttd_auth_mod). --export([init/1, check/3, is_superuser/2, description/0]). +-export([init/1, check/3, description/0]). init(Opts) -> {ok, Opts}. check(_Client, _Password, _Opts) -> ok. -is_superuser(_Client, _Opts) -> false. - description() -> "Anonymous Authentication Module". diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index 168fca2e5..15a751ea8 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -24,7 +24,7 @@ -behaviour(emqttd_auth_mod). %% emqttd_auth_mod callbacks --export([init/1, check/3, is_superuser/2, description/0]). +-export([init/1, check/3, description/0]). -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). @@ -88,8 +88,6 @@ check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) -> _ -> {error, password_error} end. -is_superuser(_Client, _Opts) -> false. - description() -> "ClientId authentication module". %%-------------------------------------------------------------------- diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index f05db3a35..845d5a0fb 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -30,13 +30,11 @@ -callback(init(AuthOpts :: list()) -> {ok, State :: any()}). --callback(check(Client, Password, State) -> ok | ignore | {error, string()} when +-callback(check(Client, Password, State) -> ok | | {ok, boolean()} | ignore | {error, string()} when Client :: mqtt_client(), Password :: binary(), State :: any()). --callback(is_superuser(Client :: mqtt_client(), State :: any()) -> boolean()). - -callback(description() -> string()). -else. @@ -44,7 +42,7 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{init, 1}, {check, 3}, {is_superuser, 2}, {description, 0}]; + [{init, 1}, {check, 3}, {description, 0}]; behaviour_info(_Other) -> undefined. diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index d30f39e31..545bce7c3 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -31,7 +31,7 @@ -export([add_user/2, remove_user/1, lookup_user/1, all_users/0]). %% emqttd_auth callbacks --export([init/1, check/3, is_superuser/2, description/0]). +-export([init/1, check/3, description/0]). -define(AUTH_USERNAME_TAB, mqtt_auth_username). @@ -146,8 +146,6 @@ check(#mqtt_client{username = Username}, Password, _Opts) -> end end. -is_superuser(_Client, _Opts) -> false. - description() -> "Username password authentication module". @@ -162,5 +160,5 @@ md5_hash(SaltBin, Password) -> erlang:md5(<>). salt() -> - emqttd_time:seed(), Salt = random:uniform(16#ffffffff), <>. + emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <>. diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 007e570a1..f3579d46b 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -159,12 +159,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> - Client = client(State1), - case emqttd_access_control:auth(Client, Password) of - ok -> - %% Is Superuser? - IsSuperuser = emqttd_access_control:is_superuser(Client), - + case authenticate(client(State1), Password) of + {ok, IsSuperuser} -> %% Generate clientId if null State2 = maybe_set_clientid(State1), @@ -190,7 +186,9 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Run hooks emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)), %% Send connack - send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3); + send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), + %% stop if authentication failure + stop_if_auth_failure(ReturnCode1, State3); process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) -> case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of @@ -302,6 +300,12 @@ trace(send, Packet, ProtoState) -> redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). +stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH -> + {stop, {shutdown, auth_failure}, State}; + +stop_if_auth_failure(_RC, State) -> + {ok, State}. + shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; @@ -435,6 +439,13 @@ parse_topic_table(TopicTable) -> parse_topics(Topics) -> [emqttd_topic:parse(Topic) || Topic <- Topics]. +authenticate(Client, Password) -> + case emqttd_access_control:auth(Client, Password) of + ok -> {ok, false}; + {ok, IsSuper} -> {ok, IsSuper}; + {error, Error} -> {error, Error} + end. + %% PUBLISH ACL is cached in process dictionary. check_acl(publish, Topic, Client) -> IfCache = emqttd:conf(cache_acl, true), From 6b3d67c51539e0f5a7e4ffe3d8c76e501be11973 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 30 Sep 2016 16:13:50 +0800 Subject: [PATCH 08/13] fix random:seed/1 warning --- src/emqttd_time.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_time.erl b/src/emqttd_time.erl index 0ade04f6f..a85acd131 100644 --- a/src/emqttd_time.erl +++ b/src/emqttd_time.erl @@ -20,8 +20,8 @@ seed() -> case erlang:function_exported(erlang, timestamp, 0) of - true -> random:seed(erlang:timestamp()); %% R18 - false -> random:seed(os:timestamp()) %% Compress now() deprecated warning... + true -> rand:seed(erlang:timestamp()); %% R18 + false -> rand:seed(os:timestamp()) %% Compress now() deprecated warning... end. now_to_secs() -> now_to_secs(os:timestamp()). From daa87206a7e30efcd4f22494aca2091a49c6eb0b Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 1 Oct 2016 09:43:21 +0800 Subject: [PATCH 09/13] random --- src/emqttd_time.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_time.erl b/src/emqttd_time.erl index a85acd131..0ade04f6f 100644 --- a/src/emqttd_time.erl +++ b/src/emqttd_time.erl @@ -20,8 +20,8 @@ seed() -> case erlang:function_exported(erlang, timestamp, 0) of - true -> rand:seed(erlang:timestamp()); %% R18 - false -> rand:seed(os:timestamp()) %% Compress now() deprecated warning... + true -> random:seed(erlang:timestamp()); %% R18 + false -> random:seed(os:timestamp()) %% Compress now() deprecated warning... end. now_to_secs() -> now_to_secs(os:timestamp()). From 36ecbdc65369d0eb59d1a0cd43b1a3db401a9c59 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 2 Oct 2016 14:18:42 +0800 Subject: [PATCH 10/13] rm .git --- Makefile | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index d42673d93..1bef9acb6 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,12 @@ PROJECT_VERSION = 2.0 DEPS = gproc lager gen_logger gen_conf esockd mochiweb -dep_gproc = git https://github.com/uwiger/gproc.git -dep_lager = git https://github.com/basho/lager.git -dep_gen_conf = git https://github.com/emqtt/gen_conf.git -dep_gen_logger = git https://github.com/emqtt/gen_logger.git -dep_esockd = git https://github.com/emqtt/esockd.git emq20 -dep_mochiweb = git https://github.com/emqtt/mochiweb.git +dep_gproc = git https://github.com/uwiger/gproc +dep_lager = git https://github.com/basho/lager +dep_gen_conf = git https://github.com/emqtt/gen_conf +dep_gen_logger = git https://github.com/emqtt/gen_logger +dep_esockd = git https://github.com/emqtt/esockd emq20 +dep_mochiweb = git https://github.com/emqtt/mochiweb ERLC_OPTS += +'{parse_transform, lager_transform}' From 2e9bc161362dc68e8c6bdfcf88c523e6fe757074 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 2 Oct 2016 14:36:54 +0800 Subject: [PATCH 11/13] emqttd_plugins:init/0 --- src/emqttd_app.erl | 1 + src/emqttd_plugins.erl | 25 +++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 81f1d329c..2299a7ec0 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -48,6 +48,7 @@ start(_StartType, _StartArgs) -> start_servers(Sup), emqttd_cli:load(), load_all_mods(), + emqttd_plugins:init(), emqttd_plugins:load(), start_listeners(), register(emqttd, self()), diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 360e828f0..384a595de 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -18,12 +18,27 @@ -include("emqttd.hrl"). +-export([init/0]). + -export([load/0, unload/0]). -export([load/1, unload/1]). -export([list/0]). +init() -> + case emqttd:conf(plugins_etc_dir) of + {ok, PluginsEtc} -> + CfgFiles = filelib:wildcard("*.conf", PluginsEtc), + lists:foreach(fun(CfgFile) -> + App = app_name(CfgFile), + application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)), + gen_conf:init(App) + end, CfgFiles); + undefined -> + ok + end. + %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, any()}). load() -> @@ -90,11 +105,11 @@ list() -> end. plugin(CfgFile) -> - [AppName | _] = string:tokens(CfgFile, "."), - {ok, Attrs} = application:get_all_key(list_to_atom(AppName)), + AppName = app_name(CfgFile), + {ok, Attrs} = application:get_all_key(AppName), Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = list_to_atom(AppName), version = Ver, descr = Descr}. + #mqtt_plugin{name = AppName, version = Ver, descr = Descr}. %% @doc Load a Plugin -spec(load(atom()) -> ok | {error, any()}). @@ -185,6 +200,9 @@ stop_app(App) -> %% Internal functions %%-------------------------------------------------------------------- +app_name(File) -> + [AppName | _] = string:tokens(File, "."), list_to_atom(AppName). + names(plugin) -> names(list()); @@ -244,4 +262,3 @@ write_loaded(AppNames) -> lager:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. - From 52403ceb675f3770bc970e2c1b412781b475da06 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 2 Oct 2016 15:02:49 +0800 Subject: [PATCH 12/13] update Makefile --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index aedddfbe9..74a744b0d 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{gproc,".*",{git,"https://github.com/uwiger/gproc.git",""}},{lager,".*",{git,"https://github.com/basho/lager.git",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger.git",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf.git",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd.git","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb.git",""}} +{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. From cec3f781a0c7b4ae1c0762e5bac1382ac7250282 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 2 Oct 2016 18:42:55 +0800 Subject: [PATCH 13/13] 2.0-rc.1 - update docs for redis plugin --- docs/source/changes.rst | 21 +++++++++++++++++++++ docs/source/plugins.rst | 39 ++++++++++++++++++++------------------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/docs/source/changes.rst b/docs/source/changes.rst index 218d9e0a7..3d659666a 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,6 +5,27 @@ Changes ======= +.. _release_2.0_rc.1: + +---------------- +Version 2.0-rc.1 +---------------- + +*Release Date: 2016-10-03* + +1. `mqtt/superuser` POST called two times in `emqtt_auth_http` (#696) + +2. Close MQTT TCP connection if authentication failed (#707) + +3. Improve the plugin management. Developer don't need to add plugin's config to rel/sys.config + +4. Add `BUILD_DEPS` in the plugin's Makefile:: + + BUILD_DEPS = emqttd + dep_emqttd = git https://github.com/emqtt/emqttd emq20 + +5. Improve the design of Redis ACL. + .. _release_2.0_beta.3: ------------------ diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index aecc1966d..2b90427cb 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -415,26 +415,25 @@ etc/plugins/emqttd_auth_redis.conf: %% Variables: %u = username, %c = clientid - %% HMGET mqtt_user:%u is_superuser - {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]}. - %% HMGET mqtt_user:%u password - {authcmd, ["HGET", "mqtt_user:%u", "password"]}. + {authcmd, "HGET mqtt_user:%u password"}. %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2? {password_hash, sha256}. - %% SMEMBERS mqtt_acl:%u - {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]}. + %% HMGET mqtt_user:%u is_superuser + {supercmd, "HGET mqtt_user:%u is_superuser"}. + + %% HGETALL mqtt_acl:%u + {aclcmd, "HGETALL mqtt_acl:%u"}. %% If no rules matched, return... {acl_nomatch, deny}. %% Load Subscriptions form Redis when client connected. - {subcmd, ["HGETALL", "mqtt_subs:%u"]}. + {subcmd, "HGETALL mqtt_sub:%u"}. - -Redis User HASH +Redis User Hash --------------- Set a 'user' hash with 'password' field, for example:: @@ -442,16 +441,18 @@ Set a 'user' hash with 'password' field, for example:: HSET mqtt_user: is_superuser 1 HSET mqtt_user: password "passwd" -Redis ACL Rule SET ------------------- +Redis ACL Rule Hash +------------------- -The plugin uses a redis SET to store ACL rules:: +The plugin uses a redis Hash to store ACL rules:: - SADD mqtt_acl: "publish topic1" - SADD mqtt_acl: "subscribe topic2" - SADD mqtt_acl: "pubsub topic3" + HSET mqtt_acl: topic1 1 + HSET mqtt_acl: topic2 2 + HSET mqtt_acl: topic3 3 -Redis Subscription HASH +.. NOTE:: 1: subscribe, 2: publish, 3: pubsub + +Redis Subscription Hash ----------------------- The plugin can store static subscriptions in a redis Hash:: @@ -495,14 +496,14 @@ etc/plugins/emqttd_plugin_mongo.conf: %% Variables: %u = username, %c = clientid %% Superuser Query - {superquery, pool, [ + {superquery, [ {collection, "mqtt_user"}, {super_field, "is_superuser"}, {selector, {"username", "%u"}} ]}. %% Authentication Query - {authquery, pool, [ + {authquery, [ {collection, "mqtt_user"}, {password_field, "password"}, %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2? @@ -511,7 +512,7 @@ etc/plugins/emqttd_plugin_mongo.conf: ]}. %% ACL Query: "%u" = username, "%c" = clientid - {aclquery, pool, [ + {aclquery, [ {collection, "mqtt_acl"}, {selector, {"username", "%u"}} ]}.