Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-03-19 09:51:37 +08:00
commit 5fb4f23504
26 changed files with 335 additions and 417 deletions

View File

@ -5,12 +5,12 @@ PROJECT_DESCRIPTION = EMQ X Broker
DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
dep_jsx = hex-emqx 2.9.0
dep_gproc = hex-emqx 0.8.0
dep_jsx = git-emqx https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
dep_cowboy = hex-emqx 2.4.0
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
NO_AUTOPATCH = cuttlefish
@ -31,14 +31,14 @@ EUNIT_OPTS = verbose
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message emqx_os_mon \
emqx_vm_mon emqx_alarm_handler
emqx_vm_mon emqx_alarm_handler emqx_rpc
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

View File

@ -1672,7 +1672,7 @@ end}.
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = Parse(Opt, Val) ++ [proplists:get_value(ssl_opts, Opts, [])],
SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]

View File

@ -29,7 +29,7 @@
-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]).
%% Hooks API
-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]).
-export([hook/2, hook/3, hook/4, unhook/2, run_hook/2, run_fold_hook/3]).
%% Shutdown and reboot
-export([shutdown/0, shutdown/1, reboot/0]).
@ -142,13 +142,13 @@ hook(HookPoint, Action, Filter, Priority) ->
unhook(HookPoint, Action) ->
emqx_hooks:del(HookPoint, Action).
-spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
run_hooks(HookPoint, Args) ->
-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
run_hook(HookPoint, Args) ->
emqx_hooks:run(HookPoint, Args).
-spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}).
run_hooks(HookPoint, Args, Acc) ->
emqx_hooks:run(HookPoint, Args, Acc).
-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
run_fold_hook(HookPoint, Args, Acc) ->
emqx_hooks:run_fold(HookPoint, Args, Acc).
%%------------------------------------------------------------------------------
%% Shutdown and reboot

View File

@ -14,198 +14,55 @@
-module(emqx_access_control).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([authenticate/2]).
-export([authenticate/1]).
-export([check_acl/3, reload_acl/0]).
-export([register_mod/3, register_mod/4, unregister_mod/2]).
-export([lookup_mods/1]).
-export([stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
%%------------------------------------------------------------------------------
%% API
%% APIs
%%------------------------------------------------------------------------------
%% @doc Start access control server.
-spec(start_link() -> {ok, pid()} | {error, term()}).
start_link() ->
start_with(fun register_default_acl/0).
start_with(Fun) ->
case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of
{ok, Pid} ->
Fun(), {ok, Pid};
{error, Reason} ->
{error, Reason}
end.
register_default_acl() ->
case emqx_config:get_env(acl_file) of
undefined -> ok;
File -> register_mod(acl, emqx_acl_internal, [File])
end.
-spec(authenticate(emqx_types:credentials(), emqx_types:password())
-> ok | {ok, map()} | {continue, map()} | {error, term()}).
authenticate(Credentials, Password) ->
authenticate(Credentials, Password, lookup_mods(auth)).
authenticate(Credentials, _Password, []) ->
Zone = maps:get(zone, Credentials, undefined),
case emqx_zone:get_env(Zone, allow_anonymous, false) of
true -> ok;
false -> {error, auth_modules_not_found}
end;
authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
try Mod:check(Credentials, Password, State) of
ok -> ok;
{ok, IsSuper} when is_boolean(IsSuper) ->
{ok, #{is_superuser => IsSuper}};
{ok, Result} when is_map(Result) ->
{ok, Result};
{continue, Result} when is_map(Result) ->
{continue, Result};
ignore ->
authenticate(Credentials, Password, Mods);
{error, Reason} ->
{error, Reason}
catch
error:Reason:StackTrace ->
?LOG(error, "Authenticate failed. StackTrace: ~p", [StackTrace]),
{error, Reason}
-spec(authenticate(emqx_types:credentials())
-> {ok, emqx_types:credentials()} | {error, term()}).
authenticate(Credentials) ->
case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of
#{result := success} = NewCredentials ->
{ok, NewCredentials};
NewCredentials ->
{error, maps:get(result, NewCredentials, unknown_error)}
end.
%% @doc Check ACL
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_types:topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when PubSub =:= publish; PubSub =:= subscribe ->
check_acl(Credentials, PubSub, Topic, lookup_mods(acl), emqx_acl_cache:is_enabled()).
check_acl(Credentials, PubSub, Topic, AclMods, false) ->
do_check_acl(Credentials, PubSub, Topic, AclMods);
check_acl(Credentials, PubSub, Topic, AclMods, true) ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Credentials, PubSub, Topic, AclMods),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult ->
AclResult
check_acl(Credentials, PubSub, Topic) ->
case emqx_acl_cache:is_enabled() of
false ->
do_check_acl(Credentials, PubSub, Topic);
true ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Credentials, PubSub, Topic),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult ->
AclResult
end
end.
do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) ->
emqx_zone:get_env(Zone, acl_nomatch, deny);
do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Credentials, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods)
do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) ->
case emqx_hooks:run_fold('client.check_acl', [Credentials, PubSub, Topic],
emqx_zone:get_env(Zone, acl_nomatch, deny)) of
allow -> allow;
_ -> deny
end.
-spec(reload_acl() -> list(ok | {error, term()})).
reload_acl() ->
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
emqx_mod_acl_internal:reload_acl().
%% @doc Register an Auth/ACL module.
-spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}).
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl ->
register_mod(Type, Mod, Opts, 0).
-spec(register_mod(auth | acl, module(), list(), non_neg_integer())
-> ok | {error, term()}).
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
%% @doc Unregister an Auth/ACL module.
-spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}).
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
%% @doc Lookup all Auth/ACL modules.
-spec(lookup_mods(auth | acl) -> list()).
lookup_mods(Type) ->
case ets:lookup(?TAB, tab_key(Type)) of
[] -> [];
[{_, Mods}] -> Mods
end.
tab_key(auth) -> auth_modules;
tab_key(acl) -> acl_modules.
stop() ->
gen_server:stop(?SERVER, normal, infinity).
%%-----------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
{ok, #{}}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type),
reply(case lists:keymember(Mod, 1, Mods) of
true -> {error, already_exists};
false ->
try Mod:init(Opts) of
{ok, ModState} ->
NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
Seq1 >= Seq2
end, [{Mod, ModState, Seq} | Mods]),
ets:insert(?TAB, {tab_key(Type), NewMods}),
ok
catch
_:Error ->
emqx_logger:error("[AccessControl] Failed to init ~s: ~p", [Mod, Error]),
{error, Error}
end
end, State);
handle_call({unregister_mod, Type, Mod}, _From, State) ->
Mods = lookup_mods(Type),
reply(case lists:keyfind(Mod, 1, Mods) of
false ->
{error, not_found};
{Mod, _ModState, _Seq} ->
ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
end, State);
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[AccessControl] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
reply(Reply, State) ->
{reply, Reply, State}.
init_result(Credentials) ->
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
true -> success;
false -> not_authorized
end.

View File

@ -16,6 +16,8 @@
-include("emqx.hrl").
-type(acl_result() :: allow | deny).
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
@ -23,10 +25,8 @@
-type(access() :: subscribe | publish | pubsub).
-type(rule() :: {allow, all} |
{allow, who(), access(), list(emqx_topic:topic())} |
{deny, all} |
{deny, who(), access(), list(emqx_topic:topic())}).
-type(rule() :: {acl_result(), all} |
{acl_result(), who(), access(), list(emqx_topic:topic())}).
-export_type([rule/0]).

View File

@ -345,10 +345,10 @@ connected(internal, maybe_send, State) ->
{next_state, connecting, disconnect(NewState)}
end;
connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRefCurrent, connection := Conn} = State) ->
#{conn_ref := ConnRefCurrent} = State) ->
case ConnRefCurrent =:= ConnRef of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Conn, Reason]),
?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Reason]),
{next_state, connecting,
State#{conn_ref := undefined, connection := undefined}};
false ->

View File

@ -167,13 +167,14 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Headers = Msg#message.headers,
case emqx_hooks:run_fold('message.publish', [], Msg#message{headers = Headers#{allow_publish => true}}) of
#message{headers = #{allow_publish := false}} ->
?WARN("Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results;
{stop, _} ->
?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
Delivery#delivery.results
end.
%% Called internally
@ -443,5 +444,4 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------

View File

@ -141,7 +141,15 @@ init({Transport, RawSocket, Options}) ->
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
SendFun = fun(Data) -> Transport:async_send(Socket, Data) end,
SendFun = fun(Packet, SeriaOpts) ->
Data = emqx_frame:serialize(Packet, SeriaOpts),
case Transport:async_send(Socket, Data) of
ok ->
{ok, Data};
{error, Reason} ->
{error, Reason}
end
end,
ProtoState = emqx_protocol:init(#{peername => Peername,
sockname => Sockname,
peercert => Peercert,
@ -484,4 +492,3 @@ shutdown(Reason, State) ->
stop(Reason, State) ->
{stop, Reason, State}.

View File

@ -22,7 +22,7 @@
-export([start_link/0, stop/0]).
%% Hooks API
-export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]).
-export([add/2, add/3, add/4, del/2, run/2, run_fold/3, lookup/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -90,39 +90,44 @@ del(HookPoint, Action) ->
gen_server:cast(?SERVER, {del, HookPoint, Action}).
%% @doc Run hooks.
-spec(run(atom(), list(Arg :: any())) -> ok | stop).
-spec(run(atom(), list(Arg::term())) -> ok).
run(HookPoint, Args) ->
run_(lookup(HookPoint), Args).
do_run(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator.
-spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}).
run(HookPoint, Args, Acc) ->
run_(lookup(HookPoint), Args, Acc).
-spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()).
run_fold(HookPoint, Args, Acc) ->
do_run_fold(lookup(HookPoint), Args, Acc).
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filter_passed(Filter, Args) andalso execute(Action, Args) of
false -> run_(Callbacks, Args);
ok -> run_(Callbacks, Args);
stop -> stop;
_Any -> run_(Callbacks, Args)
%% stop the hook chain and return
stop -> ok;
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop'
_ -> do_run(Callbacks, Args)
end;
run_([], _Args) ->
do_run([], _Args) ->
ok.
%% @private
run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc],
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
false -> run_(Callbacks, Args, Acc);
ok -> run_(Callbacks, Args, Acc);
{ok, NewAcc} -> run_(Callbacks, Args, NewAcc);
stop -> {stop, Acc};
{stop, NewAcc} -> {stop, NewAcc};
_Any -> run_(Callbacks, Args, Acc)
%% stop the hook chain
stop -> Acc;
%% stop the hook chain with NewAcc
{stop, NewAcc} -> NewAcc;
%% continue the hook chain with NewAcc
{ok, NewAcc} -> do_run_fold(Callbacks, Args, NewAcc);
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop' or {'stop', NewAcc}
_ -> do_run_fold(Callbacks, Args, Acc)
end;
run_([], _Args, Acc) ->
{ok, Acc}.
do_run_fold([], _Args, Acc) ->
Acc.
-spec(filter_passed(filter(), Args::term()) -> true | false).
filter_passed(undefined, _Args) -> true;

View File

@ -81,11 +81,12 @@ unset_flag(Flag, Msg = #message{flags = Flags}) ->
false -> Msg
end.
-spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()).
-spec(set_headers(undefined | map(), emqx_types:message()) -> emqx_types:message()).
set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) ->
Msg#message{headers = Headers};
set_headers(New, Msg = #message{headers = Old}) when is_map(New) ->
Msg#message{headers = maps:merge(Old, New)}.
Msg#message{headers = maps:merge(Old, New)};
set_headers(undefined, Msg) -> Msg.
-spec(get_header(term(), emqx_types:message()) -> term()).
get_header(Hdr, Msg) ->

View File

@ -12,57 +12,56 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_acl_internal).
-module(emqx_mod_acl_internal).
-behaviour(emqx_acl_mod).
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
-export([load/1, unload/1]).
-export([all_rules/0]).
%% ACL mod callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
-export([check_acl/5, reload_acl/0]).
-define(ACL_RULE_TAB, emqx_acl_rule).
-type(state() :: #{acl_file := string()}).
-define(FUNC(M, F, A), {M, F, A}).
-type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
subscribe => [emqx_access_rule:rule()]}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
load(_Env) ->
Rules = load_rules_from_file(acl_file()),
emqx_hooks:add('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules]), -1).
unload(_Env) ->
Rules = load_rules_from_file(acl_file()),
emqx_hooks:del('client.check_acl', ?FUNC(?MODULE, check_acl, [Rules])).
%% @doc Read all rules
-spec(all_rules() -> list(emqx_access_rule:rule())).
all_rules() ->
case ets:lookup(?ACL_RULE_TAB, all_rules) of
[] -> [];
[{_, Rules}] -> Rules
end.
load_rules_from_file(acl_file()).
%%------------------------------------------------------------------------------
%% ACL callbacks
%%------------------------------------------------------------------------------
-spec(init([File :: string()]) -> {ok, #{}}).
init([File]) ->
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
ok = load_rules_from_file(File),
{ok, #{acl_file => File}}.
load_rules_from_file(AclFile) ->
case file:consult(AclFile) of
{ok, Terms} ->
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
ets:insert(?ACL_RULE_TAB, {PubSub,
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
end, [publish, subscribe]),
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
ok;
#{publish => lists:filter(fun(Rule) -> filter(publish, Rule) end, Rules),
subscribe => lists:filter(fun(Rule) -> filter(subscribe, Rule) end, Rules)};
{error, Reason} ->
emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
{error, Reason}
?LOG(error, "[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
#{}
end.
filter(_PubSub, {allow, all}) ->
@ -79,20 +78,18 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL
-spec(check_acl({emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic()}, #{})
-> allow | deny | ignore).
check_acl({Credentials, PubSub, Topic}, _State) ->
case match(Credentials, Topic, lookup(PubSub)) of
{matched, allow} -> allow;
{matched, deny} -> deny;
nomatch -> ignore
-spec(check_acl(emqx_types:credentials(), emqx_types:pubsub(), emqx_topic:topic(),
emqx_access_rule:acl_result(), acl_rules())
-> {ok, allow} | {ok, deny} | ok).
check_acl(Credentials, PubSub, Topic, _AclResult, Rules) ->
case match(Credentials, Topic, lookup(PubSub, Rules)) of
{matched, allow} -> {ok, allow};
{matched, deny} -> {ok, deny};
nomatch -> ok
end.
lookup(PubSub) ->
case ets:lookup(?ACL_RULE_TAB, PubSub) of
[] -> [];
[{PubSub, Rules}] -> Rules
end.
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).
match(_Credentials, _Topic, []) ->
nomatch;
@ -104,11 +101,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
{matched, AllowDeny}
end.
-spec(reload_acl(state()) -> ok | {error, term()}).
reload_acl(#{acl_file := AclFile}) ->
try load_rules_from_file(AclFile) of
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
try load_rules_from_file(acl_file()) of
ok ->
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
ok;
{error, Error} ->
{error, Error}
@ -118,6 +115,5 @@ reload_acl(#{acl_file := AclFile}) ->
{error, Reason}
end.
-spec(description() -> string()).
description() ->
"Internal ACL with etc/acl.conf".
acl_file() ->
emqx_config:get_env(acl_file).

View File

@ -18,6 +18,7 @@
-spec(load() -> ok).
load() ->
ok = emqx_mod_acl_internal:load([]),
lists:foreach(
fun({Mod, Env}) ->
ok = Mod:load(Env),
@ -26,6 +27,7 @@ load() ->
-spec(unload() -> ok).
unload() ->
ok = emqx_mod_acl_internal:unload([]),
lists:foreach(
fun({Mod, Env}) ->
Mod:unload(Env) end,

View File

@ -57,7 +57,6 @@
will_msg,
keepalive,
mountpoint,
is_super,
is_bridge,
enable_ban,
enable_acl,
@ -68,7 +67,8 @@
connected_at,
ignore_loop,
topic_alias_maximum,
conn_mod
conn_mod,
credentials
}).
-opaque(state() :: #pstate{}).
@ -97,7 +97,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
@ -111,7 +110,8 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
topic_alias_maximum = #{to_client => 0, from_client => 0},
conn_mod = maps:get(conn_mod, SocketOpts, undefined)}.
conn_mod = maps:get(conn_mod, SocketOpts, undefined),
credentials = #{}}.
init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
@ -153,10 +153,10 @@ attrs(#pstate{zone = Zone,
proto_name = ProtoName,
keepalive = Keepalive,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
connected_at = ConnectedAt,
conn_mod = ConnMod}) ->
conn_mod = ConnMod,
credentials = Credentials}) ->
[{zone, Zone},
{client_id, ClientId},
{username, Username},
@ -167,10 +167,11 @@ attrs(#pstate{zone = Zone,
{clean_start, CleanStart},
{keepalive, Keepalive},
{mountpoint, Mountpoint},
{is_super, IsSuper},
{is_bridge, IsBridge},
{connected_at, ConnectedAt},
{conn_mod, ConnMod}].
{conn_mod, ConnMod},
{credentials, Credentials}
].
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);
@ -200,6 +201,8 @@ caps(#pstate{zone = Zone}) ->
client_id(#pstate{client_id = ClientId}) ->
ClientId.
credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
Credentials;
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
@ -362,8 +365,7 @@ process(?CONNECT_PACKET(
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
PState1 = set_username(Username,
PState0 = set_username(Username,
PState#pstate{client_id = NewClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
@ -372,20 +374,21 @@ process(?CONNECT_PACKET(
conn_props = ConnProps,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
Credentials = credentials(PState0),
PState1 = PState0#pstate{credentials = Credentials},
connack(
case check_connect(ConnPkt, PState1) of
{ok, PState2} ->
case authenticate(credentials(PState2), Password) of
{ok, IsSuper} ->
%% Maybe assign a clientId
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
ok ->
case emqx_access_control:authenticate(Credentials#{password => Password}) of
{ok, Credentials0} ->
PState3 = maybe_assign_client_id(PState1),
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
%% Open session
SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
case try_open_session(SessAttrs, PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true},
PState4 = PState3#pstate{session = SPid, connected = true,
credentials = maps:remove(password, Credentials0)},
ok = emqx_cm:register_connection(client_id(PState4)),
true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
%% Start keepalive
@ -394,11 +397,11 @@ process(?CONNECT_PACKET(
{?RC_SUCCESS, SP, PState4};
{error, Error} ->
?LOG(error, "Failed to open session: ~p", [Error]),
{?RC_UNSPECIFIED_ERROR, PState1}
{?RC_UNSPECIFIED_ERROR, PState1#pstate{credentials = Credentials0}}
end;
{error, Reason} ->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]),
{?RC_NOT_AUTHORIZED, PState1}
?LOG(error, "Client ~s (Username: '~s') login failed for ~p", [NewClientId, Username, Reason]),
{emqx_reason_codes:connack_error(Reason), PState1#pstate{credentials = Credentials}}
end;
{error, ReasonCode} ->
{ReasonCode, PState1}
@ -406,8 +409,8 @@ process(?CONNECT_PACKET(
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -416,8 +419,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -430,8 +433,8 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
process(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
ok ->
do_publish(Packet, PState);
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
@ -480,16 +483,10 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters1)),
{ok, PState};
{stop, _} ->
ReasonCodes = lists:duplicate(length(TopicFilters),
?RC_IMPLEMENTATION_SPECIFIC_ERROR),
deliver({suback, PacketId, ReasonCodes}, PState)
end;
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
{ok, PState};
{error, TopicFilters} ->
{SubTopics, ReasonCodes} =
lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
@ -509,17 +506,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
case emqx_hooks:run('client.unsubscribe', [credentials(PState)],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of
{ok, TopicFilters} ->
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
{ok, PState};
{stop, _Acc} ->
ReasonCodes = lists:duplicate(length(RawTopicFilters),
?RC_IMPLEMENTATION_SPECIFIC_ERROR),
deliver({unsuback, PacketId, ReasonCodes}, PState)
end;
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
{ok, PState};
process(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);
@ -547,11 +538,11 @@ process(?DISCONNECT_PACKET(_), PState) ->
%%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
[ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
_ = deliver({connack, ReasonCode1}, PState),
{error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
@ -660,8 +651,8 @@ deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg0),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState);
@ -692,9 +683,8 @@ deliver({disconnect, _ReasonCode}, PState) ->
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
case Send(Data) of
ok ->
case Send(Packet, #{version => Ver}) of
{ok, Data} ->
trace(send, Packet),
emqx_metrics:sent(Packet),
emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)),
@ -744,17 +734,6 @@ try_open_session(SessAttrs, PState = #pstate{zone = Zone,
Other -> Other
end.
authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false};
{ok, IsSuper} when is_boolean(IsSuper) ->
{ok, IsSuper};
{ok, Result} when is_map(Result) ->
{ok, maps:get(is_superuser, Result, false)};
{error, Error} ->
{error, Error}
end.
set_property(Name, Value, ?NO_PROPS) ->
#{Name => Value};
set_property(Name, Value, Props) ->
@ -855,25 +834,21 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
#pstate{zone = Zone}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
ok;
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
allow -> ok;
deny ->
{error, ?RC_NOT_AUTHORIZED}
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
run_check_steps([], _Packet, PState) ->
{ok, PState};
run_check_steps([], _Packet, _PState) ->
ok;
run_check_steps([Check|Steps], Packet, PState) ->
case Check(Packet, PState) of
ok ->
run_check_steps(Steps, Packet, PState);
{ok, PState1} ->
run_check_steps(Steps, Packet, PState1);
Error = {error, _RC} ->
Error
end.
@ -886,15 +861,13 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
{error, TopicFilter1}
end.
check_sub_acl(TopicFilters, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
{ok, TopicFilters};
check_sub_acl(TopicFilters, PState) ->
Credentials = credentials(PState),
lists:foldr(
fun({Topic, SubOpts}, {Ok, Acc}) ->
case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
allow -> {Ok, [{Topic, SubOpts}|Acc]};
deny ->
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
@ -928,7 +901,7 @@ terminate(discard, _PState) ->
ok;
terminate(Reason, PState) ->
?LOG(info, "Shutdown for ~p", [Reason]),
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
start_keepalive(0, _PState) ->
ignore;

View File

@ -25,10 +25,12 @@
-type psk_user_state() :: term().
-spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error.
lookup(psk, ClientPSKID, UserState) ->
try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of
{ok, SharedSecret} -> {ok, SharedSecret};
{stop, SharedSecret} -> {ok, SharedSecret}
lookup(psk, ClientPSKID, _UserState) ->
try emqx_hooks:run_fold('tls_handshake.psk_lookup', [ClientPSKID], not_found) of
SharedSecret when is_binary(SharedSecret) -> {ok, SharedSecret};
Error ->
?LOG(error, "Look PSK for PSKID ~p error: ~p", [ClientPSKID, Error]),
error
catch
Except:Error:Stacktrace ->
?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]),

View File

@ -17,7 +17,7 @@
-include("emqx_mqtt.hrl").
-export([name/2, text/1]).
-export([name/2, text/1, connack_error/1]).
-export([compat/2]).
name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
@ -143,3 +143,14 @@ compat(suback, Code) when Code =< ?QOS_2 -> Code;
compat(suback, Code) when Code >= 16#80 -> 16#80;
compat(unsuback, _Code) -> undefined.
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(username_or_password_undefined) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(password_error) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(not_authorized) -> ?RC_NOT_AUTHORIZED;
connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
connack_error(server_busy) -> ?RC_SERVER_BUSY;
connack_error(banned) -> ?RC_BANNED;
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
connack_error(_) -> ?RC_NOT_AUTHORIZED.

View File

@ -21,10 +21,18 @@
-define(RPC, gen_rpc).
call(Node, Mod, Fun, Args) ->
?RPC:call(Node, Mod, Fun, Args).
filter_result(?RPC:call(Node, Mod, Fun, Args)).
multicall(Nodes, Mod, Fun, Args) ->
?RPC:multicall(Nodes, Mod, Fun, Args).
filter_result(?RPC:multicall(Nodes, Mod, Fun, Args)).
cast(Node, Mod, Fun, Args) ->
?RPC:cast(Node, Mod, Fun, Args).
filter_result(?RPC:cast(Node, Mod, Fun, Args)).
filter_result(Delivery) ->
case Delivery of
{badrpc, Reason} -> {badrpc, Reason};
{badtcp, Reason} -> {badrpc, Reason};
_ -> Delivery
end.

View File

@ -369,7 +369,7 @@ init([Parent, #{zone := Zone,
ok = emqx_sm:register_session(ClientId, self()),
true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
true = emqx_sm:set_session_stats(ClientId, stats(State)),
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
ok = emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
ok = emqx_misc:init_proc_mng_policy(Zone),
ok = proc_lib:init_ack(Parent, {ok, self()}),
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
@ -466,22 +466,13 @@ handle_call(Req, _From, State) ->
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
lists:foldr(
fun ({Topic, SubOpts = #{qos := QoS, rc := RC}}, {RcAcc, SubMap}) when
RC == ?QOS_0; RC == ?QOS_1; RC == ?QOS_2 ->
{[QoS|RcAcc], do_subscribe(ClientId, Username, Topic, SubOpts, SubMap)};
({_Topic, #{rc := RC}}, {RcAcc, SubMap}) ->
{[RC|RcAcc], SubMap}
end, {[], Subscriptions}, TopicFilters),
suback(FromPid, PacketId, ReasonCodes),
noreply(ensure_stats_timer(State#state{subscriptions = Subscriptions1}));
@ -493,7 +484,7 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
ok = emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@ -568,7 +559,7 @@ handle_cast({resume, #{conn_pid := ConnPid,
%% Clean Session: true -> false???
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
%% Replay delivery and Dequeue pending messages
noreply(ensure_stats_timer(dequeue(retry_delivery(true, State1))));
@ -668,7 +659,7 @@ terminate(Reason, #state{will_msg = WillMsg,
old_conn_pid = OldConnPid}) ->
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
ok = emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -941,7 +932,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
if
Dropped =/= undefined ->
SessProps = #{client_id => ClientId, username => Username},
emqx_hooks:run('message.dropped', [SessProps, Msg]);
ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
true -> ok
end,
State#state{mqueue = NewQ}.
@ -980,7 +971,7 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
@ -990,7 +981,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, username = Username
acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
ok = emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}, Msg]),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
@ -1118,3 +1109,18 @@ noreply(State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end.

View File

@ -62,8 +62,6 @@ init([]) ->
%% Broker Sup
BrokerSup = supervisor_spec(emqx_broker_sup),
BridgeSup = supervisor_spec(emqx_bridge_sup),
%% AccessControl
AccessControl = worker_spec(emqx_access_control),
%% Session Manager
SMSup = supervisor_spec(emqx_sm_sup),
%% Connection Manager
@ -75,7 +73,6 @@ init([]) ->
RouterSup,
BrokerSup,
BridgeSup,
AccessControl,
SMSup,
CMSup,
SysSup]}}.

View File

@ -217,6 +217,8 @@ parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
_ -> error({invalid_topic, Topic})
end
end;
parse(Topic, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}};
parse(Topic, Options) ->
{Topic, Options}.

View File

@ -143,12 +143,13 @@ websocket_init(#state{request = Req, options = Options}) ->
idle_timeout = IdleTimout}}.
send_fun(WsPid) ->
fun(Data) ->
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data),
emqx_pd:update_counter(send_cnt, 1),
emqx_pd:update_counter(send_oct, BinSize),
WsPid ! {binary, iolist_to_binary(Data)},
ok
{ok, Data}
end.
stat_fun() ->
@ -305,4 +306,3 @@ shutdown(Reason, State) ->
wsock_stats() ->
[{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].

View File

@ -39,6 +39,16 @@ end_per_suite(_Config) ->
local_path(RelativePath) ->
filename:join([get_base_dir(), RelativePath]).
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
get_base_dir() ->
{file, Here} = code:is_loaded(?MODULE),
filename:dirname(filename:dirname(Here)).
@ -56,6 +66,9 @@ read_schema_configs(App, {SchemaFile, ConfigFile}) ->
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
set_special_configs(emqx) ->
application:set_env(emqx, acl_file, deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.

View File

@ -72,7 +72,7 @@ publish(_) ->
ok = emqx:subscribe(<<"test/+">>),
timer:sleep(10),
emqx:publish(Msg),
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
?assert(receive {dispatch, <<"test/+">>, #message{payload = <<"hello">>}} -> true after 100 -> false end).
dispatch_with_no_sub(_) ->
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
@ -98,7 +98,7 @@ pubsub(_) ->
true;
P ->
ct:log("Receive Message: ~p~n",[P])
after 2 ->
after 100 ->
false
end),
spawn(fun() ->
@ -124,14 +124,14 @@ t_shared_subscribe(_) ->
emqx:subscribe(<<"a/#">>),
timer:sleep(10),
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
?assert(receive {dispatch, <<"a/#">>, _} -> true after 100 -> false end),
emqx:unsubscribe(<<"a/#">>).
'pubsub+'(_) ->
emqx:subscribe(<<"a/+/+">>),
timer:sleep(10),
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 100 -> false end),
emqx:unsubscribe(<<"a/+/+">>).
%%--------------------------------------------------------------------

View File

@ -63,7 +63,7 @@ receive_messages(Count, Msgs) ->
receive_messages(Count-1, [Msg|Msgs]);
_Other ->
receive_messages(Count, Msgs)
after 10 ->
after 100 ->
Msgs
end.

View File

@ -21,7 +21,7 @@
-include_lib("common_test/include/ct.hrl").
all() ->
[add_delete_hook, run_hooks].
[add_delete_hook, run_hook].
add_delete_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
@ -54,57 +54,55 @@ add_delete_hook(_) ->
?assertEqual([], emqx_hooks:lookup(emqx_hook)),
ok = emqx_hooks:stop().
run_hooks(_) ->
run_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]),
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]),
ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]),
{stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []),
{ok, []} = emqx:run_hooks(unknown_hook, [], []),
[r5,r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
[] = emqx:run_fold_hook(unknown_hook, [], []),
ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
[r9] = emqx:run_fold_hook(foldl_hook2, [arg], []),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
{error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
stop = emqx:run_hooks(foreach_hook, [arg]),
ok = emqx:run_hook(foreach_hook, [arg]),
ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
{stop, []} = emqx:run_hooks(foldl_hook2, [arg], []),
%% foreach hook always returns 'ok' or 'stop'
ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed
?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), %% filter passed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), %% filter failed
%% foldl hook always returns {'ok', Acc} or {'stop', Acc}
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}),
ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}),
?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)),
?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)),
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)),
ok = emqx_hooks:stop().
hook_fun1(arg) -> ok;
hook_fun1(_) -> stop.
hook_fun1(_) -> error.
hook_fun2(arg) -> ok;
hook_fun2(_) -> stop.
hook_fun2(_) -> error.
hook_fun2(_, Acc) -> {ok, Acc + 1}.
hook_fun2_1(_, Acc) -> {ok, Acc + 1}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r4 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {ok, [r5 | Acc]}.
hook_fun6(arg, initArg) -> ok.
hook_fun7(arg, initArg) -> any.
hook_fun8(arg, initArg) -> stop.
hook_fun7(arg, initArg) -> ok.
hook_fun8(arg, initArg) -> ok.
hook_fun9(arg, _Acc) -> any.
hook_fun10(arg, _Acc) -> stop.
hook_fun9(arg, Acc) -> {stop, [r9 | Acc]}.
hook_fun10(arg, Acc) -> {stop, [r10 | Acc]}.
hook_filter1(arg) -> true;
hook_filter1(_) -> false.

View File

@ -546,6 +546,8 @@ acl_deny_do_disconnect(publish, QoS, Topic) ->
{ok, _} = emqx_client:connect(Client),
emqx_client:publish(Client, Topic, <<"test">>, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after publish, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
@ -560,6 +562,8 @@ acl_deny_do_disconnect(subscribe, QoS, Topic) ->
{ok, _} = emqx_client:connect(Client),
{ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
receive
{disconnected, shutdown, tcp_closed} ->
ct:pal(info, "[OK] after subscribe, client got disconnected: tcp_closed", []);
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);

36
test/emqx_rpc_SUITE.erl Normal file
View File

@ -0,0 +1,36 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_rpc_SUITE).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-define(MASTER, 'emqxct@127.0.0.1').
all() -> [t_rpc].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_rpc(_) ->
60000 = emqx_rpc:call(?MASTER, timer, seconds, [60]),
{badrpc, _} = emqx_rpc:call(?MASTER, os, test, []),
{_, []} = emqx_rpc:multicall([?MASTER, ?MASTER], os, timestamp, []).