diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 2769559c7..b706baf99 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -193,8 +193,8 @@ stats(#channel{session = Session})-> emqx_session:stats(Session). -spec(caps(channel()) -> emqx_types:caps()). -caps(#channel{clientinfo = #{zone := Zone, listener := Listener}}) -> - emqx_mqtt_caps:get_caps(Zone, Listener). +caps(#channel{clientinfo = #{zone := Zone}}) -> + emqx_mqtt_caps:get_caps(Zone). %%-------------------------------------------------------------------- @@ -1206,8 +1206,8 @@ run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) -> %%-------------------------------------------------------------------- %% Check Connect Packet -check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> - emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone, Listener)). +check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone}}) -> + emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)). %%-------------------------------------------------------------------- %% Enrich Client Info @@ -1432,8 +1432,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{topic_name = Topic} }, - #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> - emqx_mqtt_caps:check_pub(Zone, Listener, #{qos => QoS, retain => Retain, topic => Topic}). + #channel{clientinfo = #{zone := Zone}}) -> + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). %%-------------------------------------------------------------------- %% Check Sub ACL @@ -1461,9 +1461,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone, - listener := Listener}}) -> - emqx_mqtt_caps:check_sub(Zone, Listener, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> + emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId @@ -1485,14 +1484,14 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri %% Enrich ConnAck Caps enrich_connack_caps(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{ - zone := Zone, listener := Listener}}) -> + zone := Zone}}) -> #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, retain_available := Retain, max_topic_alias := MaxAlias, shared_subscription := Shared, wildcard_subscription := Wildcard - } = emqx_mqtt_caps:get_caps(Zone, Listener), + } = emqx_mqtt_caps:get_caps(Zone), NAckProps = AckProps#{'Retain-Available' => flag(Retain), 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, @@ -1561,9 +1560,9 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo, init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, properties = Properties}, - #{zone := Zone, listener := Listener} = _ClientInfo) -> + #{zone := Zone} = _ClientInfo) -> #{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0), - inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone, Listener)) + inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone)) }; init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index b4ac7b228..0b852d88d 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -69,8 +69,8 @@ stop() -> gen_server:stop(?MODULE). %% @doc Detect flapping when a MQTT client disconnected. -spec(detect(emqx_types:clientinfo()) -> boolean()). -detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := Listener}) -> - Policy = #{max_count := Threshold} = get_policy(Zone, Listener), +detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) -> + Policy = #{max_count := Threshold} = get_policy(Zone), %% The initial flapping record sets the detect_cnt to 0. InitVal = #flapping{ clientid = ClientId, @@ -89,7 +89,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := L end end. -get_policy(Zone, Listener) -> +get_policy(Zone) -> emqx_config:get_zone_conf(Zone, [flapping_detect]). now_diff(TS) -> erlang:system_time(millisecond) - TS. @@ -137,12 +137,12 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, _TRef, {garbage_collect, Zone, Listener}}, State) -> +handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) -> Timestamp = erlang:system_time(millisecond) - - maps:get(window_time, get_policy(Zone, Listener)), + - maps:get(window_time, get_policy(Zone)), MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}], ets:select_delete(?FLAPPING_TAB, MatchSpec), - start_timer(Zone, Listener), + start_timer(Zone), {noreply, State, hibernate}; handle_info(Info, State) -> @@ -155,13 +155,11 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -start_timer(Zone, Listener) -> - WindTime = maps:get(window_time, get_policy(Zone, Listener)), - emqx_misc:start_timer(WindTime, {garbage_collect, Zone, Listener}). +start_timer(Zone) -> + WindTime = maps:get(window_time, get_policy(Zone)), + emqx_misc:start_timer(WindTime, {garbage_collect, Zone}). start_timers() -> - lists:foreach(fun({Zone, ZoneConf}) -> - lists:foreach(fun({Listener, _}) -> - start_timer(Zone, Listener) - end, maps:to_list(maps:get(listeners, ZoneConf, #{}))) + lists:foreach(fun({Zone, _ZoneConf}) -> + start_timer(Zone) end, maps:to_list(emqx_config:get([zones], #{}))). \ No newline at end of file diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index 978d80471..add86ef99 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -20,11 +20,11 @@ -include("emqx_mqtt.hrl"). -include("types.hrl"). --export([ check_pub/3 - , check_sub/4 +-export([ check_pub/2 + , check_sub/3 ]). --export([ get_caps/2 +-export([ get_caps/1 ]). -export_type([caps/0]). @@ -64,18 +64,18 @@ shared_subscription => true }). --spec(check_pub(emqx_types:zone(), atom(), +-spec(check_pub(emqx_types:zone(), #{qos := emqx_types:qos(), retain := boolean(), topic := emqx_topic:topic()}) -> ok_or_error(emqx_types:reason_code())). -check_pub(Zone, Listener, Flags) when is_map(Flags) -> +check_pub(Zone, Flags) when is_map(Flags) -> do_check_pub(case maps:take(topic, Flags) of {Topic, Flags1} -> Flags1#{topic_levels => emqx_topic:levels(Topic)}; error -> Flags - end, maps:with(?PUBCAP_KEYS, get_caps(Zone, Listener))). + end, maps:with(?PUBCAP_KEYS, get_caps(Zone))). do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) when Limit > 0, Levels > Limit -> @@ -87,12 +87,12 @@ do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), atom(), +-spec(check_sub(emqx_types:zone(), emqx_types:topic(), emqx_types:subopts()) -> ok_or_error(emqx_types:reason_code())). -check_sub(Zone, Listener, Topic, SubOpts) -> - Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone, Listener)), +check_sub(Zone, Topic, SubOpts) -> + Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)), Flags = lists:foldl( fun(max_topic_levels, Map) -> Map#{topic_levels => emqx_topic:levels(Topic)}; @@ -113,7 +113,7 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(_Flags, _Caps) -> ok. -get_caps(Zone, Listener) -> +get_caps(Zone) -> lists:foldl(fun({K, V}, Acc) -> Acc#{K => emqx_config:get_zone_conf(Zone, [mqtt, K], V)} end, #{}, maps:to_list(?DEFAULT_CAPS)). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 1edd5bd88..c9a58edd8 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -157,27 +157,27 @@ %%-------------------------------------------------------------------- -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). -init(#{zone := Zone, listener := Listener}, #{receive_maximum := MaxInflight}) -> - #session{max_subscriptions = get_conf(Zone, Listener, max_subscriptions), +init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> + #session{max_subscriptions = get_conf(Zone, max_subscriptions), subscriptions = #{}, - upgrade_qos = get_conf(Zone, Listener, upgrade_qos), + upgrade_qos = get_conf(Zone, upgrade_qos), inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone, Listener), + mqueue = init_mqueue(Zone), next_pkt_id = 1, - retry_interval = timer:seconds(get_conf(Zone, Listener, retry_interval)), + retry_interval = timer:seconds(get_conf(Zone, retry_interval)), awaiting_rel = #{}, - max_awaiting_rel = get_conf(Zone, Listener, max_awaiting_rel), - await_rel_timeout = timer:seconds(get_conf(Zone, Listener, await_rel_timeout)), + max_awaiting_rel = get_conf(Zone, max_awaiting_rel), + await_rel_timeout = timer:seconds(get_conf(Zone, await_rel_timeout)), created_at = erlang:system_time(millisecond) }. %% @private init mq -init_mqueue(Zone, Listener) -> +init_mqueue(Zone) -> emqx_mqueue:init(#{ - max_len => get_conf(Zone, Listener, max_mqueue_len), - store_qos0 => get_conf(Zone, Listener, mqueue_store_qos0), - priorities => get_conf(Zone, Listener, mqueue_priorities), - default_priority => get_conf(Zone, Listener, mqueue_default_priority) + max_len => get_conf(Zone, max_mqueue_len), + store_qos0 => get_conf(Zone, mqueue_store_qos0), + priorities => get_conf(Zone, mqueue_priorities), + default_priority => get_conf(Zone, mqueue_default_priority) }). %%-------------------------------------------------------------------- @@ -696,5 +696,5 @@ set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). -get_conf(Zone, Listener, Key) -> +get_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 8461ae8c9..3cedf3957 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -880,7 +880,7 @@ t_check_sub_acls(_) -> t_enrich_connack_caps(_) -> ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]), ok = meck:expect(emqx_mqtt_caps, get_caps, - fun(_Zone, _Listener) -> + fun(_Zone) -> #{max_packet_size => 1024, max_qos_allowed => ?QOS_2, retain_available => true, diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index 7cdba6011..05696fc69 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -29,13 +29,13 @@ t_check_pub(_) -> emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1), emqx_config:put_zone_conf(default, [mqtt, retain_available], false), timer:sleep(50), - ok = emqx_mqtt_caps:check_pub(default, mqtt_tcp, #{qos => ?QOS_1, retain => false}), + ok = emqx_mqtt_caps:check_pub(default, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, ?assertEqual({error, ?RC_QOS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_pub(default, mqtt_tcp, PubFlags1)), + emqx_mqtt_caps:check_pub(default, PubFlags1)), PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, - emqx_mqtt_caps:check_pub(default, mqtt_tcp, PubFlags2)), + emqx_mqtt_caps:check_pub(default, PubFlags2)), emqx_config:put(OldConf). t_check_sub(_) -> @@ -50,11 +50,11 @@ t_check_sub(_) -> emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false), emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts), + ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"a/b/c/d">>, SubOpts)), + emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts)), ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"+/#">>, SubOpts)), + emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts#{share => true})), + emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})), emqx_config:put(OldConf). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 87bba1939..575add29b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -585,7 +585,7 @@ handle_call(discard, Channel) -> handle_call(list_acl_cache, Channel) -> %% This won't work - {reply, emqx_acl_cache:list_acl_cache(default, mqtt_tcp), Channel}; + {reply, emqx_acl_cache:list_acl_cache(default), Channel}; %% XXX: No Quota Now % handle_call({quota, Policy}, Channel) ->