Optimize caps
This commit is contained in:
parent
8bf06b9c63
commit
4b59db62fa
|
@ -906,7 +906,7 @@ listener.tcp.external.access.1 = allow all
|
|||
## Enable the option for X.509 certificate based authentication.
|
||||
## EMQX will use the common name of certificate as MQTT username.
|
||||
##
|
||||
## Value: cn | dn
|
||||
## Value: cn | dn | crt
|
||||
## listener.tcp.external.peer_cert_as_username = cn
|
||||
|
||||
## The TCP backlog defines the maximum length that the queue of pending
|
||||
|
@ -1251,10 +1251,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
|
|||
## Value: on | off
|
||||
## listener.ssl.external.honor_cipher_order = on
|
||||
|
||||
## Use the CN, EN or CRT field from the client certificate as a username.
|
||||
## Use the CN, DN or CRT field from the client certificate as a username.
|
||||
## Notice that 'verify' should be set as 'verify_peer'.
|
||||
##
|
||||
## Value: cn | en | crt
|
||||
## Value: cn | dn | crt
|
||||
## listener.ssl.external.peer_cert_as_username = cn
|
||||
|
||||
## TCP backlog for the SSL connection.
|
||||
|
|
|
@ -118,40 +118,43 @@ do_check_sub(_Flags, _Caps) -> ok.
|
|||
|
||||
-spec(get_caps(emqx_zone:zone()) -> caps()).
|
||||
get_caps(Zone) ->
|
||||
with_env(Zone, '$mqtt_caps', fun all_caps/1).
|
||||
% with_env(Zone, '$mqtt_caps', fun all_caps/1).
|
||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- maps:to_list(?DEFAULT_CAPS)]).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
|
||||
get_caps(Zone, publish) ->
|
||||
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||
% with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone));
|
||||
get_caps(Zone, subscribe) ->
|
||||
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||
% with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()).
|
||||
get_caps(Zone, Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def).
|
||||
|
||||
pub_caps(Zone) ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||
% pub_caps(Zone) ->
|
||||
% filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
sub_caps(Zone) ->
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
% sub_caps(Zone) ->
|
||||
% filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
all_caps(Zone) ->
|
||||
maps:map(fun(Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def)
|
||||
end, ?DEFAULT_CAPS).
|
||||
% all_caps(Zone) ->
|
||||
% maps:map(fun(Cap, Def) ->
|
||||
% emqx_zone:get_env(Zone, Cap, Def)
|
||||
% end, ?DEFAULT_CAPS).
|
||||
|
||||
filter_caps(Keys, Caps) ->
|
||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
||||
|
||||
with_env(Zone, Key, InitFun) ->
|
||||
case emqx_zone:get_env(Zone, Key) of
|
||||
undefined ->
|
||||
Caps = InitFun(Zone),
|
||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
Caps;
|
||||
Caps -> Caps
|
||||
end.
|
||||
% with_env(Zone, Key, InitFun) ->
|
||||
% case emqx_zone:get_env(Zone, Key) of
|
||||
% undefined ->
|
||||
% Caps = InitFun(Zone),
|
||||
% ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
% Caps;
|
||||
% Caps -> Caps
|
||||
% end.
|
||||
|
||||
-spec(default() -> caps()).
|
||||
default() -> ?DEFAULT_CAPS.
|
||||
|
|
|
@ -34,7 +34,9 @@ t_check_pub(_) ->
|
|||
PubCaps = #{max_qos_allowed => ?QOS_1,
|
||||
retain_available => false
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
||||
lists:foreach(fun({Key, Val}) ->
|
||||
ok = emqx_zone:set_env(zone, Key, Val)
|
||||
end, maps:to_list(PubCaps)),
|
||||
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
|
||||
retain => false}),
|
||||
PubFlags1 = #{qos => ?QOS_2, retain => false},
|
||||
|
@ -43,7 +45,9 @@ t_check_pub(_) ->
|
|||
PubFlags2 = #{qos => ?QOS_1, retain => true},
|
||||
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_pub(zone, PubFlags2)),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
|
||||
lists:foreach(fun({Key, _Val}) ->
|
||||
true = emqx_zone:unset_env(zone, Key)
|
||||
end, maps:to_list(PubCaps)).
|
||||
|
||||
t_check_sub(_) ->
|
||||
SubOpts = #{rh => 0,
|
||||
|
@ -56,7 +60,9 @@ t_check_sub(_) ->
|
|||
shared_subscription => false,
|
||||
wildcard_subscription => false
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
|
||||
lists:foreach(fun({Key, Val}) ->
|
||||
ok = emqx_zone:set_env(zone, Key, Val)
|
||||
end, maps:to_list(SubCaps)),
|
||||
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
|
||||
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
|
||||
|
@ -64,4 +70,6 @@ t_check_sub(_) ->
|
|||
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_sub_caps').
|
||||
lists:foreach(fun({Key, _Val}) ->
|
||||
true = emqx_zone:unset_env(zone, Key)
|
||||
end, maps:to_list(SubCaps)).
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
|
@ -32,63 +32,75 @@ end_per_suite(_Config) ->
|
|||
emqx_ct_helpers:stop_apps([]).
|
||||
|
||||
t_message_expiry_interval_1(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
t_message_expiry_interval_2(_) ->
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
ClientA = message_expiry_interval_init(),
|
||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
||||
emqtt:stop(ClientA).
|
||||
|
||||
message_expiry_interval_init() ->
|
||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientA),
|
||||
{ok, _} = emqtt:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqtt:stop(ClientB),
|
||||
ClientA.
|
||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-a">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, ClientB} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientA),
|
||||
{ok, _} = emqtt:connect(ClientB),
|
||||
%% subscribe and disconnect client-b
|
||||
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
||||
emqtt:stop(ClientB),
|
||||
ClientA.
|
||||
|
||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1500),
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:sleep(1500),
|
||||
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
%% resume the session for client-b
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
%% verify client-b could not receive the publish message
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
||||
ct:fail(should_have_expired)
|
||||
after 300 ->
|
||||
ok
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
|
||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
||||
%% as Message-Expiry-Interval = 20s
|
||||
ct:sleep(1000),
|
||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
||||
{clientid, <<"client-b">>},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||
{ok, _} = emqtt:connect(ClientB1),
|
||||
|
||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
||||
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||
when MsgExpItvl < 20 -> ok;
|
||||
{publish, _} = Msg ->
|
||||
ct:fail({incorrect_publish, Msg})
|
||||
after 300 ->
|
||||
ct:fail(no_publish_received)
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
||||
receive
|
||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
||||
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||
when MsgExpItvl < 20 -> ok;
|
||||
{publish, _} = Msg ->
|
||||
ct:fail({incorrect_publish, Msg})
|
||||
after 300 ->
|
||||
ct:fail(no_publish_received)
|
||||
end,
|
||||
emqtt:stop(ClientB1).
|
||||
|
|
Loading…
Reference in New Issue