From 4b59db62fa71fa6c5451e80b2d59cda275f073eb Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 4 Dec 2019 13:52:01 +0800 Subject: [PATCH 1/5] Optimize caps --- etc/emqx.conf | 6 +- src/emqx_mqtt_caps.erl | 41 +++++---- test/emqx_mqtt_caps_SUITE.erl | 16 +++- test/emqx_msg_expiry_interval_SUITE.erl | 110 +++++++++++++----------- 4 files changed, 98 insertions(+), 75 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 14414b220..170d7eab2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -906,7 +906,7 @@ listener.tcp.external.access.1 = allow all ## Enable the option for X.509 certificate based authentication. ## EMQX will use the common name of certificate as MQTT username. ## -## Value: cn | dn +## Value: cn | dn | crt ## listener.tcp.external.peer_cert_as_username = cn ## The TCP backlog defines the maximum length that the queue of pending @@ -1251,10 +1251,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: on | off ## listener.ssl.external.honor_cipher_order = on -## Use the CN, EN or CRT field from the client certificate as a username. +## Use the CN, DN or CRT field from the client certificate as a username. ## Notice that 'verify' should be set as 'verify_peer'. ## -## Value: cn | en | crt +## Value: cn | dn | crt ## listener.ssl.external.peer_cert_as_username = cn ## TCP backlog for the SSL connection. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index a1bdefcb0..baac352fa 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -118,40 +118,43 @@ do_check_sub(_Flags, _Caps) -> ok. -spec(get_caps(emqx_zone:zone()) -> caps()). get_caps(Zone) -> - with_env(Zone, '$mqtt_caps', fun all_caps/1). + % with_env(Zone, '$mqtt_caps', fun all_caps/1). + maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- maps:to_list(?DEFAULT_CAPS)]). -spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) -> - with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); + % with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); + filter_caps(?PUBCAP_KEYS, get_caps(Zone)); get_caps(Zone, subscribe) -> - with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). + % with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). + filter_caps(?SUBCAP_KEYS, get_caps(Zone)). -spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). get_caps(Zone, Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def). -pub_caps(Zone) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)). +% pub_caps(Zone) -> +% filter_caps(?PUBCAP_KEYS, get_caps(Zone)). -sub_caps(Zone) -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)). +% sub_caps(Zone) -> +% filter_caps(?SUBCAP_KEYS, get_caps(Zone)). -all_caps(Zone) -> - maps:map(fun(Cap, Def) -> - emqx_zone:get_env(Zone, Cap, Def) - end, ?DEFAULT_CAPS). +% all_caps(Zone) -> +% maps:map(fun(Cap, Def) -> +% emqx_zone:get_env(Zone, Cap, Def) +% end, ?DEFAULT_CAPS). filter_caps(Keys, Caps) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). -with_env(Zone, Key, InitFun) -> - case emqx_zone:get_env(Zone, Key) of - undefined -> - Caps = InitFun(Zone), - ok = emqx_zone:set_env(Zone, Key, Caps), - Caps; - Caps -> Caps - end. +% with_env(Zone, Key, InitFun) -> +% case emqx_zone:get_env(Zone, Key) of +% undefined -> +% Caps = InitFun(Zone), +% ok = emqx_zone:set_env(Zone, Key, Caps), +% Caps; +% Caps -> Caps +% end. -spec(default() -> caps()). default() -> ?DEFAULT_CAPS. diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 40f5d44d9..fae106a1e 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -34,7 +34,9 @@ t_check_pub(_) -> PubCaps = #{max_qos_allowed => ?QOS_1, retain_available => false }, - ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + lists:foreach(fun({Key, Val}) -> + ok = emqx_zone:set_env(zone, Key, Val) + end, maps:to_list(PubCaps)), ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, @@ -43,7 +45,9 @@ t_check_pub(_) -> PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags2)), - true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + lists:foreach(fun({Key, _Val}) -> + true = emqx_zone:unset_env(zone, Key) + end, maps:to_list(PubCaps)). t_check_sub(_) -> SubOpts = #{rh => 0, @@ -56,7 +60,9 @@ t_check_sub(_) -> shared_subscription => false, wildcard_subscription => false }, - ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + lists:foreach(fun({Key, Val}) -> + ok = emqx_zone:set_env(zone, Key, Val) + end, maps:to_list(SubCaps)), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), @@ -64,4 +70,6 @@ t_check_sub(_) -> emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), - true = emqx_zone:unset_env(zone, '$mqtt_sub_caps'). + lists:foreach(fun({Key, _Val}) -> + true = emqx_zone:unset_env(zone, Key) + end, maps:to_list(SubCaps)). diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl index 063cf19dc..4ededdf72 100644 --- a/test/emqx_msg_expiry_interval_SUITE.erl +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -24,7 +24,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -32,63 +32,75 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_message_expiry_interval_1(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). + ClientA = message_expiry_interval_init(), + [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], + emqtt:stop(ClientA). t_message_expiry_interval_2(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). + ClientA = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], + emqtt:stop(ClientA). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientA), - {ok, _} = emqtt:connect(ClientB), - %% subscribe and disconnect client-b - emqtt:subscribe(ClientB, <<"t/a">>, 1), - emqtt:stop(ClientB), - ClientA. + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-a">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientA), + {ok, _} = emqtt:connect(ClientB), + %% subscribe and disconnect client-b + emqtt:subscribe(ClientB, <<"t/a">>, 1), + emqtt:stop(ClientB), + ClientA. message_expiry_interval_exipred(ClientA, QoS) -> - ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), - %% publish to t/a and waiting for the message expired - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), - ct:sleep(1500), + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a and waiting for the message expired + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:sleep(1500), - %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + %% resume the session for client-b + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), - %% verify client-b could not receive the publish message - receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> - ct:fail(should_have_expired) - after 300 -> - ok - end, - emqtt:stop(ClientB1). + %% verify client-b could not receive the publish message + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + ct:fail(should_have_expired) + after 300 -> + ok + end, + emqtt:stop(ClientB1). message_expiry_interval_not_exipred(ClientA, QoS) -> - ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), - %% publish to t/a - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), - %% wait for 1s and then resume the session for client-b, the message should not expires - %% as Message-Expiry-Interval = 20s - ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {clientid, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + %% wait for 1s and then resume the session for client-b, the message should not expires + %% as Message-Expiry-Interval = 20s + ct:sleep(1000), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"client-b">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), - %% verify client-b could receive the publish message and the Message-Expiry-Interval is set - receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>, - properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} - when MsgExpItvl < 20 -> ok; - {publish, _} = Msg -> - ct:fail({incorrect_publish, Msg}) - after 300 -> - ct:fail(no_publish_received) - end, - emqtt:stop(ClientB1). + %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} + when MsgExpItvl < 20 -> ok; + {publish, _} = Msg -> + ct:fail({incorrect_publish, Msg}) + after 300 -> + ct:fail(no_publish_received) + end, + emqtt:stop(ClientB1). From dceb08703907f3270cd2722609572593462c28c8 Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 4 Dec 2019 13:54:59 +0800 Subject: [PATCH 2/5] Format code --- src/emqx_mqtt_caps.erl | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index baac352fa..2844e4fe7 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -118,44 +118,22 @@ do_check_sub(_Flags, _Caps) -> ok. -spec(get_caps(emqx_zone:zone()) -> caps()). get_caps(Zone) -> - % with_env(Zone, '$mqtt_caps', fun all_caps/1). - maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || {Cap, Def} <- maps:to_list(?DEFAULT_CAPS)]). + maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || + {Cap, Def} <- maps:to_list(?DEFAULT_CAPS)]). -spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) -> - % with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); filter_caps(?PUBCAP_KEYS, get_caps(Zone)); get_caps(Zone, subscribe) -> - % with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). filter_caps(?SUBCAP_KEYS, get_caps(Zone)). -spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). get_caps(Zone, Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def). -% pub_caps(Zone) -> -% filter_caps(?PUBCAP_KEYS, get_caps(Zone)). - -% sub_caps(Zone) -> -% filter_caps(?SUBCAP_KEYS, get_caps(Zone)). - -% all_caps(Zone) -> -% maps:map(fun(Cap, Def) -> -% emqx_zone:get_env(Zone, Cap, Def) -% end, ?DEFAULT_CAPS). - filter_caps(Keys, Caps) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). -% with_env(Zone, Key, InitFun) -> -% case emqx_zone:get_env(Zone, Key) of -% undefined -> -% Caps = InitFun(Zone), -% ok = emqx_zone:set_env(Zone, Key, Caps), -% Caps; -% Caps -> Caps -% end. - -spec(default() -> caps()). default() -> ?DEFAULT_CAPS. From d58dbf2dd2c11cb5412cce2642c4d675be4a0206 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 9 Dec 2019 12:02:12 +0800 Subject: [PATCH 3/5] revert emqx.conf --- etc/emqx.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 170d7eab2..14414b220 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -906,7 +906,7 @@ listener.tcp.external.access.1 = allow all ## Enable the option for X.509 certificate based authentication. ## EMQX will use the common name of certificate as MQTT username. ## -## Value: cn | dn | crt +## Value: cn | dn ## listener.tcp.external.peer_cert_as_username = cn ## The TCP backlog defines the maximum length that the queue of pending @@ -1251,10 +1251,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: on | off ## listener.ssl.external.honor_cipher_order = on -## Use the CN, DN or CRT field from the client certificate as a username. +## Use the CN, EN or CRT field from the client certificate as a username. ## Notice that 'verify' should be set as 'verify_peer'. ## -## Value: cn | dn | crt +## Value: cn | en | crt ## listener.ssl.external.peer_cert_as_username = cn ## TCP backlog for the SSL connection. From b3eed9123f0aaa6d8aec7ae86c548ac5de0fd746 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 21 Dec 2019 10:31:17 +0800 Subject: [PATCH 4/5] Fix typo --- src/emqx_access_control.erl | 2 -- src/emqx_metrics.erl | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 3f8dfdb8d..df21afa0e 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -35,8 +35,6 @@ -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). authenticate(ClientInfo = #{zone := Zone}) -> case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of - Result = #{auth_result := success, anonymous := true} -> - {ok, Result}; Result = #{auth_result := success} -> {ok, Result}; Result -> diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 76e14fab0..b6c06bcef 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -158,7 +158,7 @@ %% Client Lifecircle metrics -define(CLIENT_METRICS, [{counter, 'client.connected'}, - {cpunter, 'client.authenticate'}, + {counter, 'client.authenticate'}, {counter, 'client.auth.anonymous'}, {counter, 'client.check_acl'}, {counter, 'client.subscribe'}, From b612674ffa5ceb785b40ae3bc9037e8a6215563a Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Sat, 21 Dec 2019 14:16:37 +0800 Subject: [PATCH 5/5] Update emqx_mqtt_caps.erl --- src/emqx_mqtt_caps.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 2844e4fe7..f24d074d5 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -118,8 +118,7 @@ do_check_sub(_Flags, _Caps) -> ok. -spec(get_caps(emqx_zone:zone()) -> caps()). get_caps(Zone) -> - maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} || - {Cap, Def} <- maps:to_list(?DEFAULT_CAPS)]). + maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS). -spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) ->