refactor(configs): make mqtt related configs only in zone

This commit is contained in:
Shawn 2021-07-20 10:02:55 +08:00
parent af5470cb30
commit 98c7f9edb2
7 changed files with 55 additions and 58 deletions

View File

@ -193,8 +193,8 @@ stats(#channel{session = Session})->
emqx_session:stats(Session).
-spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone, listener := Listener}}) ->
emqx_mqtt_caps:get_caps(Zone, Listener).
caps(#channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:get_caps(Zone).
%%--------------------------------------------------------------------
@ -1206,8 +1206,8 @@ run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
%%--------------------------------------------------------------------
%% Check Connect Packet
check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone, listener := Listener}}) ->
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone, Listener)).
check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone}}) ->
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
%%--------------------------------------------------------------------
%% Enrich Client Info
@ -1432,8 +1432,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
retain = Retain},
variable = #mqtt_packet_publish{topic_name = Topic}
},
#channel{clientinfo = #{zone := Zone, listener := Listener}}) ->
emqx_mqtt_caps:check_pub(Zone, Listener, #{qos => QoS, retain => Retain, topic => Topic}).
#channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
%%--------------------------------------------------------------------
%% Check Sub ACL
@ -1461,9 +1461,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone,
listener := Listener}}) ->
emqx_mqtt_caps:check_sub(Zone, Listener, TopicFilter, SubOpts).
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
%%--------------------------------------------------------------------
%% Enrich SubId
@ -1485,14 +1484,14 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri
%% Enrich ConnAck Caps
enrich_connack_caps(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{
zone := Zone, listener := Listener}}) ->
zone := Zone}}) ->
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
retain_available := Retain,
max_topic_alias := MaxAlias,
shared_subscription := Shared,
wildcard_subscription := Wildcard
} = emqx_mqtt_caps:get_caps(Zone, Listener),
} = emqx_mqtt_caps:get_caps(Zone),
NAckProps = AckProps#{'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
@ -1561,9 +1560,9 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = Properties},
#{zone := Zone, listener := Listener} = _ClientInfo) ->
#{zone := Zone} = _ClientInfo) ->
#{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0),
inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone, Listener))
inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone))
};
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.

View File

@ -69,8 +69,8 @@ stop() -> gen_server:stop(?MODULE).
%% @doc Detect flapping when a MQTT client disconnected.
-spec(detect(emqx_types:clientinfo()) -> boolean()).
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := Listener}) ->
Policy = #{max_count := Threshold} = get_policy(Zone, Listener),
detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone}) ->
Policy = #{max_count := Threshold} = get_policy(Zone),
%% The initial flapping record sets the detect_cnt to 0.
InitVal = #flapping{
clientid = ClientId,
@ -89,7 +89,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := L
end
end.
get_policy(Zone, Listener) ->
get_policy(Zone) ->
emqx_config:get_zone_conf(Zone, [flapping_detect]).
now_diff(TS) -> erlang:system_time(millisecond) - TS.
@ -137,12 +137,12 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, _TRef, {garbage_collect, Zone, Listener}}, State) ->
handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
Timestamp = erlang:system_time(millisecond)
- maps:get(window_time, get_policy(Zone, Listener)),
- maps:get(window_time, get_policy(Zone)),
MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}],
ets:select_delete(?FLAPPING_TAB, MatchSpec),
start_timer(Zone, Listener),
start_timer(Zone),
{noreply, State, hibernate};
handle_info(Info, State) ->
@ -155,13 +155,11 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
start_timer(Zone, Listener) ->
WindTime = maps:get(window_time, get_policy(Zone, Listener)),
emqx_misc:start_timer(WindTime, {garbage_collect, Zone, Listener}).
start_timer(Zone) ->
WindTime = maps:get(window_time, get_policy(Zone)),
emqx_misc:start_timer(WindTime, {garbage_collect, Zone}).
start_timers() ->
lists:foreach(fun({Zone, ZoneConf}) ->
lists:foreach(fun({Listener, _}) ->
start_timer(Zone, Listener)
end, maps:to_list(maps:get(listeners, ZoneConf, #{})))
lists:foreach(fun({Zone, _ZoneConf}) ->
start_timer(Zone)
end, maps:to_list(emqx_config:get([zones], #{}))).

View File

@ -20,11 +20,11 @@
-include("emqx_mqtt.hrl").
-include("types.hrl").
-export([ check_pub/3
, check_sub/4
-export([ check_pub/2
, check_sub/3
]).
-export([ get_caps/2
-export([ get_caps/1
]).
-export_type([caps/0]).
@ -64,18 +64,18 @@
shared_subscription => true
}).
-spec(check_pub(emqx_types:zone(), atom(),
-spec(check_pub(emqx_types:zone(),
#{qos := emqx_types:qos(),
retain := boolean(),
topic := emqx_topic:topic()})
-> ok_or_error(emqx_types:reason_code())).
check_pub(Zone, Listener, Flags) when is_map(Flags) ->
check_pub(Zone, Flags) when is_map(Flags) ->
do_check_pub(case maps:take(topic, Flags) of
{Topic, Flags1} ->
Flags1#{topic_levels => emqx_topic:levels(Topic)};
error ->
Flags
end, maps:with(?PUBCAP_KEYS, get_caps(Zone, Listener))).
end, maps:with(?PUBCAP_KEYS, get_caps(Zone))).
do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Limit > 0, Levels > Limit ->
@ -87,12 +87,12 @@ do_check_pub(#{retain := true}, #{retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(_Flags, _Caps) -> ok.
-spec(check_sub(emqx_types:zone(), atom(),
-spec(check_sub(emqx_types:zone(),
emqx_types:topic(),
emqx_types:subopts())
-> ok_or_error(emqx_types:reason_code())).
check_sub(Zone, Listener, Topic, SubOpts) ->
Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone, Listener)),
check_sub(Zone, Topic, SubOpts) ->
Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)),
Flags = lists:foldl(
fun(max_topic_levels, Map) ->
Map#{topic_levels => emqx_topic:levels(Topic)};
@ -113,7 +113,7 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) -> ok.
get_caps(Zone, Listener) ->
get_caps(Zone) ->
lists:foldl(fun({K, V}, Acc) ->
Acc#{K => emqx_config:get_zone_conf(Zone, [mqtt, K], V)}
end, #{}, maps:to_list(?DEFAULT_CAPS)).

View File

@ -157,27 +157,27 @@
%%--------------------------------------------------------------------
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
init(#{zone := Zone, listener := Listener}, #{receive_maximum := MaxInflight}) ->
#session{max_subscriptions = get_conf(Zone, Listener, max_subscriptions),
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
#session{max_subscriptions = get_conf(Zone, max_subscriptions),
subscriptions = #{},
upgrade_qos = get_conf(Zone, Listener, upgrade_qos),
upgrade_qos = get_conf(Zone, upgrade_qos),
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone, Listener),
mqueue = init_mqueue(Zone),
next_pkt_id = 1,
retry_interval = timer:seconds(get_conf(Zone, Listener, retry_interval)),
retry_interval = timer:seconds(get_conf(Zone, retry_interval)),
awaiting_rel = #{},
max_awaiting_rel = get_conf(Zone, Listener, max_awaiting_rel),
await_rel_timeout = timer:seconds(get_conf(Zone, Listener, await_rel_timeout)),
max_awaiting_rel = get_conf(Zone, max_awaiting_rel),
await_rel_timeout = timer:seconds(get_conf(Zone, await_rel_timeout)),
created_at = erlang:system_time(millisecond)
}.
%% @private init mq
init_mqueue(Zone, Listener) ->
init_mqueue(Zone) ->
emqx_mqueue:init(#{
max_len => get_conf(Zone, Listener, max_mqueue_len),
store_qos0 => get_conf(Zone, Listener, mqueue_store_qos0),
priorities => get_conf(Zone, Listener, mqueue_priorities),
default_priority => get_conf(Zone, Listener, mqueue_default_priority)
max_len => get_conf(Zone, max_mqueue_len),
store_qos0 => get_conf(Zone, mqueue_store_qos0),
priorities => get_conf(Zone, mqueue_priorities),
default_priority => get_conf(Zone, mqueue_default_priority)
}).
%%--------------------------------------------------------------------
@ -696,5 +696,5 @@ set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos+1, Session, Value).
get_conf(Zone, Listener, Key) ->
get_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]).

View File

@ -880,7 +880,7 @@ t_check_sub_acls(_) ->
t_enrich_connack_caps(_) ->
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
ok = meck:expect(emqx_mqtt_caps, get_caps,
fun(_Zone, _Listener) ->
fun(_Zone) ->
#{max_packet_size => 1024,
max_qos_allowed => ?QOS_2,
retain_available => true,

View File

@ -29,13 +29,13 @@ t_check_pub(_) ->
emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
timer:sleep(50),
ok = emqx_mqtt_caps:check_pub(default, mqtt_tcp, #{qos => ?QOS_1, retain => false}),
ok = emqx_mqtt_caps:check_pub(default, #{qos => ?QOS_1, retain => false}),
PubFlags1 = #{qos => ?QOS_2, retain => false},
?assertEqual({error, ?RC_QOS_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(default, mqtt_tcp, PubFlags1)),
emqx_mqtt_caps:check_pub(default, PubFlags1)),
PubFlags2 = #{qos => ?QOS_1, retain => true},
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(default, mqtt_tcp, PubFlags2)),
emqx_mqtt_caps:check_pub(default, PubFlags2)),
emqx_config:put(OldConf).
t_check_sub(_) ->
@ -50,11 +50,11 @@ t_check_sub(_) ->
emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false),
emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false),
timer:sleep(50),
ok = emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts),
ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"a/b/c/d">>, SubOpts)),
emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"+/#">>, SubOpts)),
emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts#{share => true})),
emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})),
emqx_config:put(OldConf).

View File

@ -585,7 +585,7 @@ handle_call(discard, Channel) ->
handle_call(list_acl_cache, Channel) ->
%% This won't work
{reply, emqx_acl_cache:list_acl_cache(default, mqtt_tcp), Channel};
{reply, emqx_acl_cache:list_acl_cache(default), Channel};
%% XXX: No Quota Now
% handle_call({quota, Policy}, Channel) ->