Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-12-21 08:57:15 +00:00
commit e4bd841a10
5 changed files with 65 additions and 67 deletions

View File

@ -35,8 +35,6 @@
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
authenticate(ClientInfo = #{zone := Zone}) -> authenticate(ClientInfo = #{zone := Zone}) ->
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
Result = #{auth_result := success, anonymous := true} ->
{ok, Result};
Result = #{auth_result := success} -> Result = #{auth_result := success} ->
{ok, Result}; {ok, Result};
Result -> Result ->

View File

@ -158,7 +158,7 @@
%% Client Lifecircle metrics %% Client Lifecircle metrics
-define(CLIENT_METRICS, -define(CLIENT_METRICS,
[{counter, 'client.connected'}, [{counter, 'client.connected'},
{cpunter, 'client.authenticate'}, {counter, 'client.authenticate'},
{counter, 'client.auth.anonymous'}, {counter, 'client.auth.anonymous'},
{counter, 'client.check_acl'}, {counter, 'client.check_acl'},
{counter, 'client.subscribe'}, {counter, 'client.subscribe'},

View File

@ -118,41 +118,21 @@ do_check_sub(_Flags, _Caps) -> ok.
-spec(get_caps(emqx_zone:zone()) -> caps()). -spec(get_caps(emqx_zone:zone()) -> caps()).
get_caps(Zone) -> get_caps(Zone) ->
with_env(Zone, '$mqtt_caps', fun all_caps/1). maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS).
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). -spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
get_caps(Zone, publish) -> get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); filter_caps(?PUBCAP_KEYS, get_caps(Zone));
get_caps(Zone, subscribe) -> get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
-spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). -spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
get_caps(Zone, Cap, Def) -> get_caps(Zone, Cap, Def) ->
emqx_zone:get_env(Zone, Cap, Def). emqx_zone:get_env(Zone, Cap, Def).
pub_caps(Zone) ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
sub_caps(Zone) ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
all_caps(Zone) ->
maps:map(fun(Cap, Def) ->
emqx_zone:get_env(Zone, Cap, Def)
end, ?DEFAULT_CAPS).
filter_caps(Keys, Caps) -> filter_caps(Keys, Caps) ->
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of
undefined ->
Caps = InitFun(Zone),
ok = emqx_zone:set_env(Zone, Key, Caps),
Caps;
Caps -> Caps
end.
-spec(default() -> caps()). -spec(default() -> caps()).
default() -> ?DEFAULT_CAPS. default() -> ?DEFAULT_CAPS.

View File

@ -49,54 +49,66 @@ t_tcp_sock_passive(_) ->
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
t_message_expiry_interval_1(_) -> t_message_expiry_interval_1(_) ->
ClientA = message_expiry_interval_init(), ClientA = message_expiry_interval_init(),
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
emqtt:stop(ClientA). emqtt:stop(ClientA).
t_message_expiry_interval_2(_) -> t_message_expiry_interval_2(_) ->
ClientA = message_expiry_interval_init(), ClientA = message_expiry_interval_init(),
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
emqtt:stop(ClientA). emqtt:stop(ClientA).
message_expiry_interval_init() -> message_expiry_interval_init() ->
{ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, ClientA} = emqtt:start_link([{proto_ver,v5},
{ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {clientid, <<"client-a">>},
{ok, _} = emqtt:connect(ClientA), {clean_start, false},
{ok, _} = emqtt:connect(ClientB), {properties, #{'Session-Expiry-Interval' => 360}}]),
%% subscribe and disconnect client-b {ok, ClientB} = emqtt:start_link([{proto_ver,v5},
emqtt:subscribe(ClientB, <<"t/a">>, 1), {clientid, <<"client-b">>},
emqtt:stop(ClientB), {clean_start, false},
ClientA. {properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientA),
{ok, _} = emqtt:connect(ClientB),
%% subscribe and disconnect client-b
emqtt:subscribe(ClientB, <<"t/a">>, 1),
emqtt:stop(ClientB),
ClientA.
message_expiry_interval_exipred(ClientA, QoS) -> message_expiry_interval_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a and waiting for the message expired %% publish to t/a and waiting for the message expired
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
ct:sleep(1500), ct:sleep(1500),
%% resume the session for client-b %% resume the session for client-b
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
{ok, _} = emqtt:connect(ClientB1), {clientid, <<"client-b">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientB1),
%% verify client-b could not receive the publish message %% verify client-b could not receive the publish message
receive receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
ct:fail(should_have_expired) ct:fail(should_have_expired)
after 300 -> after 300 ->
ok ok
end, end,
emqtt:stop(ClientB1). emqtt:stop(ClientB1).
message_expiry_interval_not_exipred(ClientA, QoS) -> message_expiry_interval_not_exipred(ClientA, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a %% publish to t/a
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
%% wait for 1s and then resume the session for client-b, the message should not expires %% wait for 1s and then resume the session for client-b, the message should not expires
%% as Message-Expiry-Interval = 20s %% as Message-Expiry-Interval = 20s
ct:sleep(1000), ct:sleep(1000),
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), {ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
{ok, _} = emqtt:connect(ClientB1), {clientid, <<"client-b">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientB1),
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set %% verify client-b could receive the publish message and the Message-Expiry-Interval is set
receive receive

View File

@ -28,7 +28,9 @@ t_check_pub(_) ->
PubCaps = #{max_qos_allowed => ?QOS_1, PubCaps = #{max_qos_allowed => ?QOS_1,
retain_available => false retain_available => false
}, },
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), lists:foreach(fun({Key, Val}) ->
ok = emqx_zone:set_env(zone, Key, Val)
end, maps:to_list(PubCaps)),
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
retain => false}), retain => false}),
PubFlags1 = #{qos => ?QOS_2, retain => false}, PubFlags1 = #{qos => ?QOS_2, retain => false},
@ -37,7 +39,9 @@ t_check_pub(_) ->
PubFlags2 = #{qos => ?QOS_1, retain => true}, PubFlags2 = #{qos => ?QOS_1, retain => true},
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags2)), emqx_mqtt_caps:check_pub(zone, PubFlags2)),
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). lists:foreach(fun({Key, _Val}) ->
true = emqx_zone:unset_env(zone, Key)
end, maps:to_list(PubCaps)).
t_check_sub(_) -> t_check_sub(_) ->
SubOpts = #{rh => 0, SubOpts = #{rh => 0,
@ -50,7 +54,9 @@ t_check_sub(_) ->
shared_subscription => false, shared_subscription => false,
wildcard_subscription => false wildcard_subscription => false
}, },
ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), lists:foreach(fun({Key, Val}) ->
ok = emqx_zone:set_env(zone, Key, Val)
end, maps:to_list(SubCaps)),
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
@ -58,4 +64,6 @@ t_check_sub(_) ->
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
true = emqx_zone:unset_env(zone, '$mqtt_sub_caps'). lists:foreach(fun({Key, _Val}) ->
true = emqx_zone:unset_env(zone, Key)
end, maps:to_list(SubCaps)).