From bfd027cf8b5cb46ad4500c151c07e237eba053f7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 7 Aug 2019 14:01:50 +0800 Subject: [PATCH 1/3] Rewrite the mqtt_caps module --- src/emqx_mqtt_caps.erl | 202 +++++++++++++++++++---------------------- 1 file changed, 95 insertions(+), 107 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 5e0d2a3fc..077e24154 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -17,147 +17,135 @@ %% @doc MQTTv5 Capabilities -module(emqx_mqtt_caps). --include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("types.hrl"). -export([ check_pub/2 - , check_sub/2 + , check_sub/3 , get_caps/1 , get_caps/2 ]). --export([default_caps/0]). +-export([default/0]). -export_type([caps/0]). --type(caps() :: #{max_packet_size => integer(), +-type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), - max_topic_alias => integer(), + max_topic_alias => integer(), max_topic_levels => integer(), - max_qos_allowed => emqx_types:qos(), - mqtt_retain_available => boolean(), - mqtt_shared_subscription => boolean(), - mqtt_wildcard_subscription => boolean() + max_qos_allowed => emqx_types:qos(), + retain_available => boolean(), + wildcard_subscription => boolean(), + subscription_identifiers => boolean(), + shared_subscription => boolean() }). -define(UNLIMITED, 0). --define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, - {max_clientid_len, ?MAX_CLIENTID_LEN}, - {max_topic_alias, ?UNLIMITED}, - {max_topic_levels, ?UNLIMITED}, - {max_qos_allowed, ?QOS_2}, - {mqtt_retain_available, true}, - {mqtt_shared_subscription, true}, - {mqtt_wildcard_subscription, true} - ]). - --define(PUBCAP_KEYS, [max_qos_allowed, - mqtt_retain_available, - max_topic_alias +-define(PUBCAP_KEYS, [max_topic_alias, + max_qos_allowed, + retain_available ]). --define(SUBCAP_KEYS, [max_qos_allowed, - max_topic_levels, - mqtt_shared_subscription, - mqtt_wildcard_subscription +-define(SUBCAP_KEYS, [max_topic_levels, + wildcard_subscription, + shared_subscription ]). --spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}). -check_pub(Zone, Props) when is_map(Props) -> - do_check_pub(Props, maps:to_list(get_caps(Zone, publish))). +-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, + max_clientid_len => ?MAX_CLIENTID_LEN, + max_topic_alias => ?UNLIMITED, + max_topic_levels => ?UNLIMITED, + max_qos_allowed => ?QOS_2, + retain_available => true, + wildcard_subscription => true, + subscription_identifiers => true, + shared_subscription => true + }). -do_check_pub(_Props, []) -> - ok; -do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> - case QoS > MaxQoS of - true -> {error, ?RC_QOS_NOT_SUPPORTED}; - false -> do_check_pub(Props, Caps) - end; -do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) -> - case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of - false -> {error, ?RC_TOPIC_ALIAS_INVALID}; - true -> do_check_pub(Props, Caps) - end; -do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> +-spec(check_pub(emqx_types:zone(), + #{qos => emqx_types:qos(), + retain => boolean()}) + -> ok_or_error(emqx_types:reason_code())). +check_pub(Zone, Flags) when is_map(Flags) -> + do_check_pub(Flags, get_caps(Zone, publish)). + +do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) + when QoS > MaxQoS -> + {error, ?RC_QOS_NOT_SUPPORTED}; +do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; -do_check_pub(Props, [{max_topic_alias, _} | Caps]) -> - do_check_pub(Props, Caps); -do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> - do_check_pub(Props, Caps). +do_check_pub(#{topic_alias := TopicAlias}, + #{max_topic_alias := MaxTopicAlias}) + when 0 == TopicAlias; TopicAlias >= MaxTopicAlias -> + {error, ?RC_TOPIC_ALIAS_INVALID}; +do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), emqx_types:topic_filters()) - -> {ok | error, emqx_types:topic_filters()}). -check_sub(Zone, TopicFilters) -> - Caps = maps:to_list(get_caps(Zone, subscribe)), - lists:foldr(fun({Topic, Opts}, {Ok, Result}) -> - case check_sub(Topic, Opts, Caps) of - {ok, Opts1} -> - {Ok, [{Topic, Opts1}|Result]}; - {error, Opts1} -> - {error, [{Topic, Opts1}|Result]} - end - end, {ok, []}, TopicFilters). +-spec(check_sub(emqx_types:zone(), + emqx_types:topic(), + emqx_types:subopts()) + -> ok_or_error(emqx_types:reason_code())). +check_sub(Zone, Topic, SubOpts) -> + Caps = get_caps(Zone, subscribe), + Flags = lists:foldl( + fun(max_topic_levels, Map) -> + Map#{topic_levels => emqx_topic:levels(Topic)}; + (wildcard_subscription, Map) -> + Map#{is_wildcard => emqx_topic:wildcard(Topic)}; + (shared_subscription, Map) -> + Map#{is_shared => maps:is_key(share, SubOpts)} + end, #{}, maps:keys(Caps)), + do_check_sub(Flags, Caps). -check_sub(_Topic, Opts, []) -> - {ok, Opts}; -check_sub(Topic, Opts = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> - check_sub(Topic, Opts#{qos := min(QoS, MaxQoS)}, Caps); -check_sub(Topic, Opts, [{mqtt_shared_subscription, true}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{mqtt_shared_subscription, false}|Caps]) -> - case maps:is_key(share, Opts) of - true -> - {error, Opts#{rc := ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}; - false -> check_sub(Topic, Opts, Caps) - end; -check_sub(Topic, Opts, [{mqtt_wildcard_subscription, true}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{mqtt_wildcard_subscription, false}|Caps]) -> - case emqx_topic:wildcard(Topic) of - true -> - {error, Opts#{rc := ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}; - false -> check_sub(Topic, Opts, Caps) - end; -check_sub(Topic, Opts, [{max_topic_levels, ?UNLIMITED}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) -> - case emqx_topic:levels(Topic) of - Levels when Levels > Limit -> - {error, Opts#{rc := ?RC_TOPIC_FILTER_INVALID}}; - _ -> check_sub(Topic, Opts, Caps) - end. +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) + when Levels >= Limit -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> + {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(_Flags, _Caps) -> ok. -default_caps() -> - ?DEFAULT_CAPS. +-spec(get_caps(emqx_zone:zone()) -> caps()). +get_caps(Zone) -> + with_env(Zone, '$mqtt_caps', fun all_caps/1). +-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) -> - with_env(Zone, '$mqtt_pub_caps', - fun() -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)) - end); + with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); get_caps(Zone, subscribe) -> - with_env(Zone, '$mqtt_sub_caps', - fun() -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)) - end). + with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). -get_caps(Zone) -> - with_env(Zone, '$mqtt_caps', - fun() -> - maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} - || {Cap, Def} <- ?DEFAULT_CAPS]) - end). +pub_caps(Zone) -> + filter_caps(?PUBCAP_KEYS, get_caps(Zone)). + +sub_caps(Zone) -> + filter_caps(?SUBCAP_KEYS, get_caps(Zone)). + +all_caps(Zone) -> + maps:map(fun(Cap, Def) -> + emqx_zone:get_env(Zone, Cap, Def) + end, ?DEFAULT_CAPS). filter_caps(Keys, Caps) -> - maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). + maps:filter(fun(Key, Val) -> + lists:member(Key, Keys) andalso cap_limited(Key, Val) + end, Caps). + +cap_limited(Key, Val) -> + Val =/= maps:get(Key, ?DEFAULT_CAPS). with_env(Zone, Key, InitFun) -> case emqx_zone:get_env(Zone, Key) of - undefined -> Caps = InitFun(), - ok = emqx_zone:set_env(Zone, Key, Caps), - Caps; - ZoneCaps -> ZoneCaps + undefined -> + Caps = InitFun(Zone), + ok = emqx_zone:set_env(Zone, Key, Caps), + Caps; + Caps -> Caps end. +-spec(default() -> caps()). +default() -> ?DEFAULT_CAPS. + From bd061415c942132db87b34d0c8b271e6e6fa56cc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 8 Aug 2019 14:11:26 +0800 Subject: [PATCH 2/3] Rewrite the emqx_mqtt_caps module and add test cases - Rename `mqtt_retain_available` to `retain_available` - Rename `mqtt_wildcard_subscription` to `wildcard_subscription` - Rename `mqtt_shared_subscription` to `shared_subscription` - Add `emqx_zone:unset_env/2` API for unit test --- priv/emqx.schema | 12 ++--- src/emqx_mqtt_caps.erl | 19 ++++---- src/emqx_protocol.erl | 10 ++-- src/emqx_zone.erl | 15 +++--- test/emqx_mqtt_caps_SUITE.erl | 90 +++++++++++++++++++++++++++++++++++ 5 files changed, 118 insertions(+), 28 deletions(-) create mode 100644 test/emqx_mqtt_caps_SUITE.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 1d308476d..210248ab3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -634,19 +634,19 @@ end}. ]}. %% @doc Whether the server supports MQTT retained messages. -{mapping, "mqtt.retain_available", "emqx.mqtt_retain_available", [ +{mapping, "mqtt.retain_available", "emqx.retain_available", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Wildcard Subscriptions. -{mapping, "mqtt.wildcard_subscription", "emqx.mqtt_wildcard_subscription", [ +{mapping, "mqtt.wildcard_subscription", "emqx.wildcard_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Shared Subscriptions. -{mapping, "mqtt.shared_subscription", "emqx.mqtt_shared_subscription", [ +{mapping, "mqtt.shared_subscription", "emqx.shared_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. @@ -876,7 +876,7 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> - {mqtt_retain_available, Val}; + {retain_available, Val}; ("flapping_threshold", Val) -> [Limit, Duration] = string:tokens(Val, ", "), FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of @@ -887,9 +887,9 @@ end}. end, {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> - {mqtt_wildcard_subscription, Val}; + {wildcard_subscription, Val}; ("shared_subscription", Val) -> - {mqtt_shared_subscription, Val}; + {shared_subscription, Val}; ("publish_limit", Val) -> [Limit, Duration] = string:tokens(Val, ", "), PubLimit = case cuttlefish_duration:parse(Duration, s) of diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 077e24154..c8da4e05d 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -22,7 +22,9 @@ -export([ check_pub/2 , check_sub/3 - , get_caps/1 + ]). + +-export([ get_caps/1 , get_caps/2 ]). @@ -49,6 +51,7 @@ ]). -define(SUBCAP_KEYS, [max_topic_levels, + max_qos_allowed, wildcard_subscription, shared_subscription ]). @@ -94,12 +97,13 @@ check_sub(Zone, Topic, SubOpts) -> (wildcard_subscription, Map) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> - Map#{is_shared => maps:is_key(share, SubOpts)} + Map#{is_shared => maps:is_key(share, SubOpts)}; + (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), do_check_sub(Flags, Caps). do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) - when Levels >= Limit -> + when Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; @@ -119,7 +123,7 @@ get_caps(Zone, subscribe) -> with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). pub_caps(Zone) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)). + filter_caps(?PUBCAP_KEYS, get_caps(Zone)). sub_caps(Zone) -> filter_caps(?SUBCAP_KEYS, get_caps(Zone)). @@ -130,12 +134,7 @@ all_caps(Zone) -> end, ?DEFAULT_CAPS). filter_caps(Keys, Caps) -> - maps:filter(fun(Key, Val) -> - lists:member(Key, Keys) andalso cap_limited(Key, Val) - end, Caps). - -cap_limited(Key, Val) -> - Val =/= maps:get(Key, ?DEFAULT_CAPS). + maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). with_env(Zone, Key, InitFun) -> case emqx_zone:get_env(Zone, Key) of diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 218cf0fdc..15c3bf37c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -763,7 +763,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, PState = #protocol{client = Client, session = Session}) -> - case check_subscribe(TopicFilter, PState) of + case check_subscribe(TopicFilter, SubOpts, PState) of ok -> TopicFilter1 = mount(Client, TopicFilter), SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState), case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of @@ -787,9 +787,9 @@ enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge SubOpts#{rap => Rap, nl => Nl}. %% Check Sub -check_subscribe(TopicFilter, PState) -> +check_subscribe(TopicFilter, SubOpts, PState) -> case check_sub_acl(TopicFilter, PState) of - allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState); + allow -> check_sub_caps(TopicFilter, SubOpts, PState); deny -> {error, ?RC_NOT_AUTHORIZED} end. @@ -802,8 +802,8 @@ check_sub_acl(TopicFilter, #protocol{client = Client}) -> end. %% Check Sub Caps -check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter). +check_sub_caps(TopicFilter, SubOpts, #protocol{client = #{zone := Zone}}) -> + emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Process unsubscribe request diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index c4721485b..bac32b849 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -30,6 +30,7 @@ -export([ get_env/2 , get_env/3 , set_env/3 + , unset_env/2 , force_reload/0 ]). @@ -81,7 +82,11 @@ get_env(Zone, Key, Def) -> -spec(set_env(zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> - gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). + persistent_term:put(?KEY(Zone, Key), Val). + +-spec(unset_env(zone(), atom()) -> boolean()). +unset_env(Zone, Key) -> + persistent_term:erase(?KEY(Zone, Key)). -spec(force_reload() -> ok). force_reload() -> @@ -107,10 +112,6 @@ handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({set_env, Zone, Key, Val}, State) -> - ok = persistent_term:put(?KEY(Zone, Key), Val), - {noreply, State}; - handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -130,6 +131,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- do_reload() -> - [ persistent_term:put(?KEY(Zone, Key), Val) - || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts ]. + [persistent_term:put(?KEY(Zone, Key), Val) + || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts]. diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl new file mode 100644 index 000000000..14b1e955f --- /dev/null +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt_caps_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_check_pub(_) -> + PubCaps = #{max_qos_allowed => ?QOS_1, + retain_available => false, + max_topic_alias => 4 + }, + ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, + retain => false, + topic_alias => 1 + }), + PubFlags1 = #{qos => ?QOS_2, retain => false}, + ?assertEqual({error, ?RC_QOS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_pub(zone, PubFlags1)), + PubFlags2 = #{qos => ?QOS_1, retain => true}, + ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, + emqx_mqtt_caps:check_pub(zone, PubFlags2)), + PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5}, + ?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID}, + emqx_mqtt_caps:check_pub(zone, PubFlags3)), + true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + +t_check_sub(_) -> + SubOpts = #{rh => 0, + rap => 0, + nl => 0, + qos => ?QOS_2 + }, + SubCaps = #{max_topic_levels => 2, + max_qos_allowed => ?QOS_2, + shared_subscription => false, + wildcard_subscription => false + }, + ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), + ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, + emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), + ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), + ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), + true = emqx_zone:unset_env(zone, '$mqtt_sub_caps'). + +t_get_set_caps(_) -> + Caps = emqx_mqtt_caps:default(), + ?assertEqual(Caps, emqx_mqtt_caps:get_caps(zone)), + PubCaps = #{max_qos_allowed => ?QOS_2, + retain_available => true, + max_topic_alias => 0 + }, + ?assertEqual(PubCaps, emqx_mqtt_caps:get_caps(zone, publish)), + NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1, + retain_available => true + }, + emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps), + ?assertEqual(NewPubCaps, emqx_mqtt_caps:get_caps(zone, publish)), + + SubCaps = #{max_topic_levels => 0, + max_qos_allowed => ?QOS_2, + shared_subscription => true, + wildcard_subscription => true + }, + ?assertEqual(SubCaps, emqx_mqtt_caps:get_caps(zone, subscribe)), + true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + From f70da696f0e9b5b0f436950305f6c7a6eeacf99a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 8 Aug 2019 22:33:40 +0800 Subject: [PATCH 3/3] Fix the badmatch error --- src/emqx_protocol.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 15c3bf37c..c9bf3690b 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -314,10 +314,10 @@ handle_out({connack, ?RC_SUCCESS, SP}, ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(PState)]), #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, - mqtt_retain_available := Retain, + retain_available := Retain, max_topic_alias := MaxAlias, - mqtt_shared_subscription := Shared, - mqtt_wildcard_subscription := Wildcard + shared_subscription := Shared, + wildcard_subscription := Wildcard } = caps(PState), %% Response-Information is so far not set by broker. %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.