diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index b7f2c5743..ee7b553d9 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -209,6 +209,11 @@ #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{return_code = ReturnCode}}). +-define(CONNACK_PACKET(ReturnCode, SessPresent), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + variable = #mqtt_packet_connack{ack_flags = SessPresent, + return_code = ReturnCode}}). + -define(PUBLISH_PACKET(Qos, PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = Qos}, diff --git a/rebar.config b/rebar.config index 4b39a2f2b..84968128f 100644 --- a/rebar.config +++ b/rebar.config @@ -29,8 +29,8 @@ {deps, [ {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, - {esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "3.0"}}}, - {mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "4.0"}}} + {esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, + {mochiweb, "4.*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "master"}}} ]}. {recursive_cmds, [ct, eunit, clean]}. diff --git a/rel/files/acl.config b/rel/files/acl.config index 3359b2b04..9b1d512a6 100644 --- a/rel/files/acl.config +++ b/rel/files/acl.config @@ -1,21 +1,21 @@ %%%----------------------------------------------------------------------------- -%% -%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL) -%% -%% -type who() :: all | binary() | -%% {ipaddr, esockd_access:cidr()} | -%% {client, binary()} | -%% {user, binary()}. -%% -%% -type access() :: subscribe | publish | pubsub. -%% -%% -type topic() :: binary(). -%% -%% -type rule() :: {allow, all} | -%% {allow, who(), access(), list(topic())} | -%% {deny, all} | -%% {deny, who(), access(), list(topic())}. -%% +%%% +%%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL) +%%% +%%% -type who() :: all | binary() | +%%% {ipaddr, esockd_access:cidr()} | +%%% {client, binary()} | +%%% {user, binary()}. +%%% +%%% -type access() :: subscribe | publish | pubsub. +%%% +%%% -type topic() :: binary(). +%%% +%%% -type rule() :: {allow, all} | +%%% {allow, who(), access(), list(topic())} | +%%% {deny, all} | +%%% {deny, who(), access(), list(topic())}. +%%% %%%----------------------------------------------------------------------------- {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index a657d7bf3..d96cb1373 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -246,7 +246,7 @@ {long_gc, false}, %% Long Schedule(ms) - {long_schedule, 50}, + {long_schedule, 100}, %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. %% 8 * 1024 * 1024 diff --git a/src/emqttd.erl b/src/emqttd.erl index 09661d6c8..eeb5bab4d 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -91,8 +91,7 @@ open_listener({https, Port, Options}) -> mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> - Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)), - MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]}, + MFArgs = {emqttd_client, start_link, [env(mqtt)]}, esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs). merge_sockopts(Options) -> @@ -100,14 +99,6 @@ merge_sockopts(Options) -> proplists:get_value(sockopts, Options, [])), emqttd_opts:merge(Options, [{sockopts, SockOpts}]). -%% TODO: will refactor in 0.14.0 release. -rate_limiter(undefined) -> - undefined; -rate_limiter(Config) -> - Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end, - [Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")], - esockd_rate_limiter:new(Burst, Rate). - %%------------------------------------------------------------------------------ %% @doc Close Listeners %% @end diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 320b6f75f..7d15d5904 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_access_control). -author("Feng Lee "). @@ -36,14 +35,13 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0, - start_link/1, +-export([start_link/0, start_link/1, auth/2, % authentication check_acl/3, % acl check reload_acl/0, % reload acl - register_mod/3, - unregister_mod/2, lookup_mods/1, + register_mod/3, register_mod/4, + unregister_mod/2, stop/0]). %% gen_server callbacks @@ -77,7 +75,7 @@ auth(Client, Password) when is_record(Client, mqtt_client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> {error, "No auth module to check!"}; -auth(Client, Password, [{Mod, State} | Mods]) -> +auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> case Mod:check(Client, Password, State) of ok -> ok; {error, Reason} -> {error, Reason}; @@ -100,7 +98,7 @@ check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) -> check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), allow; -check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> +check_acl(Client, PubSub, Topic, [{M, State, _Seq}|AclMods]) -> case M:check_acl({Client, PubSub, Topic}, State) of allow -> allow; deny -> deny; @@ -113,7 +111,7 @@ check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> %%------------------------------------------------------------------------------ -spec reload_acl() -> list() | {error, any()}. reload_acl() -> - [M:reload_acl(State) || {M, State} <- lookup_mods(acl)]. + [M:reload_acl(State) || {M, State, _Seq} <- lookup_mods(acl)]. %%------------------------------------------------------------------------------ %% @doc Register authentication or ACL module @@ -121,7 +119,11 @@ reload_acl() -> %%------------------------------------------------------------------------------ -spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}. register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl-> - gen_server:call(?SERVER, {register_mod, Type, Mod, Opts}). + register_mod(Type, Mod, Opts, 0). + +-spec register_mod(auth | acl, atom(), list(), pos_integer()) -> ok | {error, any()}. +register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl-> + gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}). %%------------------------------------------------------------------------------ %% @doc Unregister authentication or ACL module @@ -172,22 +174,26 @@ init_mods(acl, AclMods) -> init_mod(Fun, Name, Opts) -> Module = Fun(Name), {ok, State} = Module:init(Opts), - {Module, State}. + {Module, State, 0}. -handle_call({register_mod, Type, Mod, Opts}, _From, State) -> +handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), Reply = case lists:keyfind(Mod, 1, Mods) of - false -> + false -> case catch Mod:init(Opts) of - {ok, ModState} -> - ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), [{Mod, ModState}|Mods]}), + {ok, ModState} -> + NewMods = + lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> + Seq1 >= Seq2 + end, [{Mod, ModState, Seq} | Mods]), + ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}), ok; {'EXIT', Error} -> lager:error("Access Control: register ~s error - ~p", [Mod, Error]), {error, Error} end; - _ -> + _ -> {error, existed} end, {reply, Reply, State}; diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index fab9461b9..30ed8f87b 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_access_rule). -author("Feng Lee "). @@ -49,17 +48,22 @@ -export([compile/1, match/3]). +-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). + %%------------------------------------------------------------------------------ %% @doc Compile access rule %% @end %%------------------------------------------------------------------------------ -compile({A, all}) when (A =:= allow) orelse (A =:= deny) -> +compile({A, all}) when ?ALLOW_DENY(A) -> {A, all}; -compile({A, Who, Access, TopicFilters}) when (A =:= allow) orelse (A =:= deny) -> +compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A) andalso is_binary(Topic) -> + {A, compile(who, Who), Access, [compile(topic, Topic)]}; + +compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) -> {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}. -compile(who, all) -> +compile(who, all) -> all; compile(who, {ipaddr, CIDR}) -> {Start, End} = esockd_access:range(CIDR), @@ -72,6 +76,10 @@ compile(who, {user, all}) -> {user, all}; compile(who, {user, Username}) -> {user, bin(Username)}; +compile(who, {'and', Conds}) when is_list(Conds) -> + {'and', [compile(who, Cond) || Cond <- Conds]}; +compile(who, {'or', Conds}) when is_list(Conds) -> + {'or', [compile(who, Cond) || Cond <- Conds]}; compile(topic, {eq, Topic}) -> {eq, emqttd_topic:words(bin(Topic))}; @@ -120,6 +128,14 @@ match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; +match_who(Client, {'and', Conds}) when is_list(Conds) -> + lists:foldl(fun(Who, Allow) -> + match_who(Client, Who) andalso Allow + end, true, Conds); +match_who(Client, {'or', Conds}) when is_list(Conds) -> + lists:foldl(fun(Who, Allow) -> + match_who(Client, Who) orelse Allow + end, false, Conds); match_who(_Client, _Who) -> false. diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 53adfa582..4ee9cabd1 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -82,7 +82,7 @@ start_servers(Sup) -> {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd broker", emqttd_broker}, {"emqttd alarm", emqttd_alarm}, - {"emqttd mode supervisor", emqttd_mod_sup}, + {"emqttd mod supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}], diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl index 392d32961..855a3d6e6 100644 --- a/src/emqttd_auth_username.erl +++ b/src/emqttd_auth_username.erl @@ -105,7 +105,7 @@ init(Opts) -> mnesia:create_table(?AUTH_USERNAME_TAB, [ {disc_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), - mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), ram_copies), + mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), emqttd_ctl:register_cmd(users, {?MODULE, cli}, []), {ok, Opts}. diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index fd70ce4f6..59441d516 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -72,8 +72,8 @@ status([]) -> case lists:keysearch(emqttd, 1, application:which_applications()) of false -> ?PRINT_MSG("emqttd is not running~n"); - {value,_Version} -> - ?PRINT_MSG("emqttd is running~n") + {value, {emqttd, _Desc, Vsn}} -> + ?PRINT("emqttd ~s is running~n", [Vsn]) end; status(_) -> ?PRINT_CMD("status", "query broker status"). diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f471d311b..a684737a5 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -124,7 +124,7 @@ handle_call(info, _From, State = #client_state{connection = Connection, ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS), ProtoInfo = emqttd_protocol:info(ProtoState), {ok, SockStats} = Connection:getstat(?SOCK_STATS), - {noreply, lists:append([ClientInfo, [{proto_info, ProtoInfo}, + {reply, lists:append([ClientInfo, [{proto_info, ProtoInfo}, {sock_stats, SockStats}]]), State}; handle_call(kick, _From, State) -> @@ -170,7 +170,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), - shutdown(confict, State); + shutdown(conflict, State); handle_info(activate_sock, State) -> noreply(run_socket(State#client_state{conn_state = running})); @@ -281,14 +281,14 @@ received(Bytes, State = #client_state{parser_fun = ParserFun, rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> run_socket(State); -rate_limit(Size, State = #client_state{rate_limit = Limiter}) -> - case esockd_ratelimit:check(Limiter, Size) of - {0, Limiter1} -> - run_socket(State#client_state{conn_state = running, rate_limit = Limiter1}); - {Pause, Limiter1} -> +rate_limit(Size, State = #client_state{rate_limit = Rl}) -> + case Rl:check(Size) of + {0, Rl1} -> + run_socket(State#client_state{conn_state = running, rate_limit = Rl1}); + {Pause, Rl1} -> ?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State), erlang:send_after(Pause, self(), activate_sock), - State#client_state{conn_state = blocked, rate_limit = Limiter1} + State#client_state{conn_state = blocked, rate_limit = Rl1} end. run_socket(State = #client_state{conn_state = blocked}) -> diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 9f5462b57..b3bfb242b 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -41,6 +41,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -record(state, {id, statsfun}). -define(CM_POOL, ?MODULE). @@ -101,8 +104,21 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +prioritise_call(_Req, _From, _Len, _State) -> + 1. + +prioritise_cast(Msg, _Len, _State) -> + case Msg of + {register, _Client} -> 2; + {unregister, _ClientId, _Pid} -> 3; + _ -> 1 + end. + +prioritise_info(_Msg, _Len, _State) -> + 1. + handle_call(Req, _From, State) -> - lager:error("unexpected request: ~p", [Req]), + lager:error("Unexpected request: ~p", [Req]), {reply, {error, unsupported_req}, State}. handle_cast({register, Client = #mqtt_client{client_id = ClientId, @@ -110,32 +126,45 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ignore; - [#mqtt_client{client_pid = OldPid}] -> - %% TODO: should cancel monitor - ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client); - [] -> + [#mqtt_client{client_pid = _OldPid, client_mon = MRef}] -> + %% demonitor + erlang:demonitor(MRef, [flush]); + [] -> ok end, - ets:insert(mqtt_client, Client), + ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}), {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(mqtt_client, ClientId) of - [#mqtt_client{client_pid = Pid}] -> - ets:delete(mqtt_client, ClientId); - [_] -> - ignore; - [] -> - ?LOG(error, "Cannot find registered: ~p", [Pid], State) - end, - {noreply, setstats(State)}; + [#mqtt_client{client_pid = Pid, client_mon = MRef}] -> + erlang:demonitor(MRef, [flush]), + ets:delete(mqtt_client, ClientId), + {noreply, setstats(State)}; + [_] -> + {noreply, State}; + [] -> + lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]), + {noreply, State} + end; handle_cast(Msg, State) -> lager:error("Unexpected Msg: ~p", [Msg]), {noreply, State}. +handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> + MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'}, + case ets:match_object(mqtt_client, MP) of + [Client] -> + ?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client), + ets:delete_object(mqtt_client, Client); + [] -> + ignore + end, + {noreply, setstats(State)}; + handle_info(Info, State) -> - lager:error("Unexpected Msg: ~p", [Info]), + lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 954332afe..053b74e1a 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -149,7 +149,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> trace(recv, Packet, State1), - {ReturnCode1, State3} = + {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> case emqttd_access_control:auth(client(State1), Password) of @@ -159,30 +159,30 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> %% Start session case emqttd_sm:start_session(CleanSess, clientid(State2)) of - {ok, Session} -> + {ok, Session, SP} -> %% Register the client emqttd_cm:register(client(State2)), %% Start keepalive start_keepalive(KeepAlive), %% ACCEPT - {?CONNACK_ACCEPT, State2#proto_state{session = Session}}; + {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}}; {error, Error} -> exit({shutdown, Error}) end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~s", [Username, Reason], State1), - {?CONNACK_CREDENTIALS, State1} + {?CONNACK_CREDENTIALS, false, State1} end; ReturnCode -> - {ReturnCode, State1} + {ReturnCode, false, State1} end, %% Run hooks emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]), %% Send connack - send(?CONNACK_PACKET(ReturnCode1), State3); + send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3); process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> - case check_acl(publish, Topic, State) of + case check_acl(publish, Topic, client(State)) of allow -> publish(Packet, State); deny -> @@ -210,7 +210,8 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> - AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], + Client = client(State), + AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), @@ -281,7 +282,7 @@ redeliver({?PUBREL, PacketId}, State) -> shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; -shutdown(confict, #proto_state{client_id = ClientId}) -> +shutdown(conflict, #proto_state{client_id = ClientId}) -> emqttd_cm:unregister(ClientId); shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) -> @@ -391,16 +392,19 @@ validate_qos(_) -> false. %% PUBLISH ACL is cached in process dictionary. -check_acl(publish, Topic, State) -> +check_acl(publish, Topic, Client) -> case get({acl, publish, Topic}) of undefined -> - AllowDeny = emqttd_access_control:check_acl(client(State), publish, Topic), + AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic), put({acl, publish, Topic}, AllowDeny), AllowDeny; AllowDeny -> AllowDeny end; -check_acl(subscribe, Topic, State) -> - emqttd_access_control:check_acl(client(State), subscribe, Topic). +check_acl(subscribe, Topic, Client) -> + emqttd_access_control:check_acl(Client, subscribe, Topic). + +sp(true) -> 1; +sp(false) -> 0. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index a1b4ab47a..8a2adbb59 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -378,6 +378,7 @@ handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, client_pid = OldClientPid, + clean_sess = CleanSess, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, @@ -405,10 +406,21 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], Session1 = Session#session{client_pid = ClientPid, + clean_sess = false, awaiting_ack = #{}, awaiting_comp = #{}, expired_timer = undefined}, + %% CleanSess: true -> false? + if + CleanSess =:= true -> + ?LOG(warning, "CleanSess changed to false.", [], Session), + emqttd_sm:unregister_session(CleanSess, ClientId), + emqttd_sm:register_session(false, ClientId, sess_info(Session1)); + CleanSess =:= false -> + ok + end, + %% Redeliver inflight messages Session2 = lists:foldl(fun({_Id, Msg}, Sess) -> @@ -585,7 +597,8 @@ kick(_ClientId, Pid, Pid) -> ignore; kick(ClientId, OldPid, Pid) -> unlink(OldPid), - OldPid ! {shutdown, conflict, {ClientId, Pid}}. + OldPid ! {shutdown, conflict, {ClientId, Pid}}, + ok. %%------------------------------------------------------------------------------ %% Check inflight and awaiting_rel diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index d62e85c86..f3211cae9 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_sm). -author("Feng Lee "). @@ -57,7 +56,7 @@ -define(SM_POOL, ?MODULE). --define(CALL_TIMEOUT, 60000). +-define(TIMEOUT, 60000). -define(LOG(Level, Format, Args, Session), lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])). @@ -103,7 +102,7 @@ pool() -> ?SM_POOL. %% @doc Start a session %% @end %%------------------------------------------------------------------------------ --spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid()} | {error, any()}. +-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}. start_session(CleanSess, ClientId) -> SM = gproc_pool:pick_worker(?SM_POOL, ClientId), call(SM, {start_session, {CleanSess, ClientId, self()}}). @@ -144,7 +143,7 @@ sesstab(true) -> mqtt_transient_session; sesstab(false) -> mqtt_persistent_session. call(SM, Req) -> - gen_server2:call(SM, Req, ?CALL_TIMEOUT). %%infinity). + gen_server2:call(SM, Req, ?TIMEOUT). %%infinity). %%%============================================================================= %%% gen_server callbacks @@ -168,20 +167,20 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of undefined -> %% create session locally - {reply, create_session(false, ClientId, ClientPid), State}; + reply(create_session(false, ClientId, ClientPid), false, State); Session -> - {reply, resume_session(Session, ClientPid), State} + reply(resume_session(Session, ClientPid), true, State) end; %% transient session handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of undefined -> - {reply, create_session(true, ClientId, ClientPid), State}; + reply(create_session(true, ClientId, ClientPid), false, State); Session -> case destroy_session(Session) of ok -> - {reply, create_session(true, ClientId, ClientPid), State}; + reply(create_session(true, ClientId, ClientPid), false, State); {error, Error} -> {reply, {error, Error}, State} end @@ -302,3 +301,8 @@ remove_session(Session) -> {aborted, Error} -> {error, Error} end. +reply({ok, SessPid}, SP, State) -> + {reply, {ok, SessPid, SP}, State}; +reply({error, Error}, _SP, State) -> + {reply, {error, Error}, State}. + diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 1d02ab46c..cb9a49e74 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -196,7 +196,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) -> ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req), - shutdown(confict, State); + shutdown(conflict, State); handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req), diff --git a/test/emqttd_access_control_tests.erl b/test/emqttd_access_control_tests.erl index 7db0490ef..5da45f4a8 100644 --- a/test/emqttd_access_control_tests.erl +++ b/test/emqttd_access_control_tests.erl @@ -42,30 +42,33 @@ register_mod_test() -> with_acl( fun() -> emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []), - ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}], + ?assertMatch([{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}], emqttd_access_control:lookup_mods(acl)), emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), - ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}], - emqttd_access_control:lookup_mods(auth)) + emqttd_access_control:register_mod(auth, emqttd_auth_dashboard, [], 99), + ?assertMatch([{emqttd_auth_dashboard, _, 99}, + {emqttd_auth_anonymous_test_mod, _, 0}, + {emqttd_auth_anonymous, _, 0}], + emqttd_access_control:lookup_mods(auth)) end). unregister_mod_test() -> with_acl( fun() -> - emqttd_access_control:register_mod(acl,emqttd_acl_test_mod, []), - ?assertMatch([{emqttd_acl_test_mod, _}, {emqttd_acl_internal, _}], + emqttd_access_control:register_mod(acl, emqttd_acl_test_mod, []), + ?assertMatch([{emqttd_acl_test_mod, _, 0}, {emqttd_acl_internal, _, 0}], emqttd_access_control:lookup_mods(acl)), emqttd_access_control:unregister_mod(acl, emqttd_acl_test_mod), timer:sleep(5), - ?assertMatch([{emqttd_acl_internal, _}], emqttd_access_control:lookup_mods(acl)), + ?assertMatch([{emqttd_acl_internal, _, 0}], emqttd_access_control:lookup_mods(acl)), emqttd_access_control:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), - ?assertMatch([{emqttd_auth_anonymous_test_mod, _}, {emqttd_auth_anonymous, _}], + ?assertMatch([{emqttd_auth_anonymous_test_mod, _, 0}, {emqttd_auth_anonymous, _, 0}], emqttd_access_control:lookup_mods(auth)), emqttd_access_control:unregister_mod(auth, emqttd_auth_anonymous_test_mod), timer:sleep(5), - ?assertMatch([{emqttd_auth_anonymous, _}], emqttd_access_control:lookup_mods(auth)) + ?assertMatch([{emqttd_auth_anonymous, _, 0}], emqttd_access_control:lookup_mods(auth)) end). check_acl_test() -> @@ -83,7 +86,7 @@ check_acl_test() -> with_acl(Fun) -> process_flag(trap_exit, true), - AclOpts = [ + AclOpts = [ {auth, [ %% Authentication with username, password %{username, []}, diff --git a/test/emqttd_access_rule_tests.erl b/test/emqttd_access_rule_tests.erl index 142beeaeb..f46f23ce4 100644 --- a/test/emqttd_access_rule_tests.erl +++ b/test/emqttd_access_rule_tests.erl @@ -35,6 +35,14 @@ -include_lib("eunit/include/eunit.hrl"). compile_test() -> + + ?assertMatch({allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}}, + {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]}, + compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})), + ?assertMatch({allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}}, + {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]}, + compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]})), + ?assertMatch({allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]}, compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})), ?assertMatch({allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]}, @@ -69,10 +77,15 @@ match_test() -> ?assertMatch({matched, allow}, match(User, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/$c"]}))), ?assertMatch({matched, allow}, match(#mqtt_client{username = <<"user2">>}, <<"users/user2/abc/def">>, - compile({allow, all, subscribe, ["users/$u/#"]}))), - ?assertMatch({matched, deny}, - match(User, <<"d/e/f">>, - compile({deny, all, subscribe, ["$SYS/#", "#"]}))). + compile({allow, all, subscribe, ["users/$u/#"]}))), + ?assertMatch({matched, deny}, match(User, <<"d/e/f">>, + compile({deny, all, subscribe, ["$SYS/#", "#"]}))), + Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}), + ?assertMatch(nomatch, match(User, <<"Topic">>, Rule)), + AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}), + ?assertMatch({matched, allow}, match(User, <<"Topic">>, AndRule)), + OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}), + ?assertMatch({matched, allow}, match(User, <<"Topic">>, OrRule)). -endif. diff --git a/test/emqttd_auth_dashboard.erl b/test/emqttd_auth_dashboard.erl new file mode 100644 index 000000000..ea9aca7e1 --- /dev/null +++ b/test/emqttd_auth_dashboard.erl @@ -0,0 +1,14 @@ + +-module(emqttd_auth_dashboard). + +%% Auth callbacks +-export([init/1, check/3, description/0]). + +init(Opts) -> + {ok, Opts}. + +check(_Client, _Password, _Opts) -> + allow. + +description() -> + "Test emqttd_auth_dashboard Mod".