diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 06e8f5ed8..ff5ad9d09 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -28,7 +28,7 @@ %% APIs %%-------------------------------------------------------------------- --spec(authenticate(emqx_types:client()) +-spec(authenticate(emqx_types:client_info()) -> {ok, #{auth_result := emqx_types:auth_result(), anonymous := boolean}} | {error, term()}). authenticate(Client) -> diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index e48caa5c0..690b45998 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -86,23 +86,23 @@ bin(B) when is_binary(B) -> B. %% @doc Match access rule --spec(match(emqx_types:client(), emqx_types:topic(), rule()) +-spec(match(emqx_types:client_info(), emqx_types:topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). -match(_Client, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) -> +match(_ClientInfo, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) -> {matched, AllowDeny}; -match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) +match(ClientInfo, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) when ?ALLOW_DENY(AllowDeny) -> - case match_who(Client, Who) - andalso match_topics(Client, Topic, TopicFilters) of + case match_who(ClientInfo, Who) + andalso match_topics(ClientInfo, Topic, TopicFilters) of true -> {matched, AllowDeny}; false -> nomatch end. -match_who(_Client, all) -> +match_who(_ClientInfo, all) -> true; -match_who(_Client, {user, all}) -> +match_who(_ClientInfo, {user, all}) -> true; -match_who(_Client, {client, all}) -> +match_who(_ClientInfo, {client, all}) -> true; match_who(#{client_id := ClientId}, {client, ClientId}) -> true; @@ -112,44 +112,44 @@ match_who(#{peerhost := undefined}, {ipaddr, _Tup}) -> false; match_who(#{peerhost := IP}, {ipaddr, CIDR}) -> esockd_cidr:match(IP, CIDR); -match_who(Client, {'and', Conds}) when is_list(Conds) -> +match_who(ClientInfo, {'and', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> - match_who(Client, Who) andalso Allow + match_who(ClientInfo, Who) andalso Allow end, true, Conds); -match_who(Client, {'or', Conds}) when is_list(Conds) -> +match_who(ClientInfo, {'or', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> - match_who(Client, Who) orelse Allow + match_who(ClientInfo, Who) orelse Allow end, false, Conds); -match_who(_Client, _Who) -> +match_who(_ClientInfo, _Who) -> false. -match_topics(_Client, _Topic, []) -> +match_topics(_ClientInfo, _Topic, []) -> false; -match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) -> - TopicFilter = feed_var(Client, PatternFilter), +match_topics(ClientInfo, Topic, [{pattern, PatternFilter}|Filters]) -> + TopicFilter = feed_var(ClientInfo, PatternFilter), match_topic(emqx_topic:words(Topic), TopicFilter) - orelse match_topics(Client, Topic, Filters); -match_topics(Client, Topic, [TopicFilter|Filters]) -> + orelse match_topics(ClientInfo, Topic, Filters); +match_topics(ClientInfo, Topic, [TopicFilter|Filters]) -> match_topic(emqx_topic:words(Topic), TopicFilter) - orelse match_topics(Client, Topic, Filters). + orelse match_topics(ClientInfo, Topic, Filters). match_topic(Topic, {eq, TopicFilter}) -> Topic == TopicFilter; match_topic(Topic, TopicFilter) -> emqx_topic:match(Topic, TopicFilter). -feed_var(Client, Pattern) -> - feed_var(Client, Pattern, []). -feed_var(_Client, [], Acc) -> +feed_var(ClientInfo, Pattern) -> + feed_var(ClientInfo, Pattern, []). +feed_var(_ClientInfo, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #{client_id := undefined}, [<<"%c">>|Words], Acc) -> - feed_var(Client, Words, [<<"%c">>|Acc]); -feed_var(Client = #{client_id := ClientId}, [<<"%c">>|Words], Acc) -> - feed_var(Client, Words, [ClientId |Acc]); -feed_var(Client = #{username := undefined}, [<<"%u">>|Words], Acc) -> - feed_var(Client, Words, [<<"%u">>|Acc]); -feed_var(Client = #{username := Username}, [<<"%u">>|Words], Acc) -> - feed_var(Client, Words, [Username|Acc]); -feed_var(Client, [W|Words], Acc) -> - feed_var(Client, Words, [W|Acc]). +feed_var(ClientInfo = #{client_id := undefined}, [<<"%c">>|Words], Acc) -> + feed_var(ClientInfo, Words, [<<"%c">>|Acc]); +feed_var(ClientInfo = #{client_id := ClientId}, [<<"%c">>|Words], Acc) -> + feed_var(ClientInfo, Words, [ClientId |Acc]); +feed_var(ClientInfo = #{username := undefined}, [<<"%u">>|Words], Acc) -> + feed_var(ClientInfo, Words, [<<"%u">>|Acc]); +feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) -> + feed_var(ClientInfo, Words, [Username|Acc]); +feed_var(ClientInfo, [W|Words], Acc) -> + feed_var(ClientInfo, Words, [W|Acc]). diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 7e3e959e3..d41f32ee0 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -73,7 +73,7 @@ start_link() -> -spec(stop() -> ok). stop() -> gen_server:stop(?MODULE). --spec(check(emqx_types:client()) -> boolean()). +-spec(check(emqx_types:client_info()) -> boolean()). check(#{client_id := ClientId, username := Username, peerhost := IPAddr}) -> diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index edd08baa6..5d76f8414 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -60,7 +60,7 @@ %% MQTT ConnInfo conninfo :: emqx_types:conninfo(), %% MQTT ClientInfo - client :: emqx_types:client(), + client_info :: emqx_types:client_info(), %% MQTT Session session :: emqx_session:session(), %% Keepalive @@ -102,7 +102,7 @@ will_timer => will_message }). --define(ATTR_KEYS, [conninfo, client, session, connected, connected_at, disconnected_at]). +-define(ATTR_KEYS, [conninfo, client_info, session, connected, connected_at, disconnected_at]). -define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, topic_aliases, alias_maximum, gc_state, disconnected_at]). %%-------------------------------------------------------------------- @@ -119,7 +119,7 @@ info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(client, #channel{client = ClientInfo}) -> +info(client_info, #channel{client_info = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); @@ -158,7 +158,7 @@ stats(#channel{session = Session}) -> emqx_session:stats(Session). -spec(caps(channel()) -> emqx_types:caps()). -caps(#channel{client = #{zone := Zone}}) -> +caps(#channel{client_info = #{zone := Zone}}) -> emqx_mqtt_caps:get_caps(Zone). %% For tests @@ -196,15 +196,15 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, protocol := Protocol}, Options) true -> undefined; false -> disabled end, - #channel{conninfo = ConnInfo, - client = ClientInfo, - gc_state = init_gc_state(Zone), - oom_policy = init_oom_policy(Zone), - timers = #{stats_timer => StatsTimer}, - connected = undefined, - takeover = false, - resuming = false, - pendings = [] + #channel{conninfo = ConnInfo, + client_info = ClientInfo, + gc_state = init_gc_state(Zone), + oom_policy = init_oom_policy(Zone), + timers = #{stats_timer => StatsTimer}, + connected = undefined, + takeover = false, + resuming = false, + pendings = [] }. peer_cert_as_username(Options) -> @@ -252,7 +252,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> end; handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), - Channel = #channel{client = ClientInfo, session = Session}) -> + Channel = #channel{client_info = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, Publishes, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), @@ -271,7 +271,7 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), end; handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), - Channel = #channel{client = ClientInfo, session = Session}) -> + Channel = #channel{client_info = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), @@ -310,7 +310,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = ClientInfo}) -> + Channel = #channel{client_info = ClientInfo}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', [ClientInfo, Properties], @@ -323,7 +323,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end; handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = ClientInfo}) -> + Channel = #channel{client_info = ClientInfo}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', [ClientInfo, Properties], @@ -369,7 +369,7 @@ handle_in(Packet, Channel) -> %%-------------------------------------------------------------------- process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, - Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) -> + Channel = #channel{conninfo = ConnInfo, client_info = ClientInfo}) -> case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, @@ -440,7 +440,7 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2}, end. publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, - client = ClientInfo = #{mountpoint := MountPoint}}) -> + client_info = ClientInfo = #{mountpoint := MountPoint}}) -> Msg = emqx_packet:to_message(ClientInfo, Packet), Msg1 = emqx_message:set_flag(dup, false, Msg), Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1), @@ -461,7 +461,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> process_subscribe(More, [RC|Acc], NChannel). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = - #channel{client = ClientInfo = #{mountpoint := MountPoint}, + #channel{client_info = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> case check_subscribe(TopicFilter, SubOpts, Channel) of ok -> @@ -491,7 +491,7 @@ process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> process_unsubscribe(More, [RC|Acc], NChannel). do_unsubscribe(TopicFilter, _SubOpts, Channel = - #channel{client = ClientInfo = #{mountpoint := MountPoint}, + #channel{client_info = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of @@ -506,7 +506,7 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %%TODO: RunFold or Pipeline handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, - Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) -> + Channel = #channel{conninfo = ConnInfo, client_info = ClientInfo}) -> AckProps = run_fold([fun enrich_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 @@ -531,7 +531,7 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, end; handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo, - client = ClientInfo}) -> + client_info = ClientInfo}) -> ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]), ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of ?MQTT_PROTO_V5 -> ReasonCode; @@ -572,11 +572,11 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> %% Ignore loop deliver handle_out({publish, _PacketId, #message{from = ClientId, flags = #{nl := true}}}, - Channel = #channel{client = #{client_id := ClientId}}) -> + Channel = #channel{client_info = #{client_id := ClientId}}) -> {ok, Channel}; handle_out({publish, PacketId, Msg}, Channel = - #channel{client = ClientInfo = #{mountpoint := MountPoint}}) -> + #channel{client_info = ClientInfo = #{mountpoint := MountPoint}}) -> Msg1 = emqx_message:update_expiry(Msg), Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), @@ -657,7 +657,7 @@ handle_call(Req, Channel) -> -spec(handle_cast(Msg :: term(), channel()) -> ok | {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_cast({register, Attrs, Stats}, #channel{client = #{client_id := ClientId}}) -> +handle_cast({register, Attrs, Stats}, #channel{client_info = #{client_id := ClientId}}) -> ok = emqx_cm:register_channel(ClientId), emqx_cm:set_chan_attrs(ClientId, Attrs), emqx_cm:set_chan_stats(ClientId, Stats); @@ -672,14 +672,14 @@ handle_cast(Msg, Channel) -> -spec(handle_info(Info :: term(), channel()) -> {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_info({subscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) -> +handle_info({subscribe, TopicFilters}, Channel = #channel{client_info = ClientInfo}) -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters)), {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) -> +handle_info({unsubscribe, TopicFilters}, Channel = #channel{client_info = ClientInfo}) -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters)), @@ -693,7 +693,7 @@ handle_info(disconnected, Channel = #channel{connected = false}) -> {ok, Channel}; handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval}, - client = ClientInfo = #{zone := Zone}, + client_info = ClientInfo = #{zone := Zone}, will_msg = WillMsg}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), Channel1 = ensure_disconnected(Channel), @@ -726,7 +726,7 @@ handle_info(Info, Channel) -> | {ok, Result :: term(), channel()} | {stop, Reason :: term(), channel()}). handle_timeout(TRef, {emit_stats, Stats}, - Channel = #channel{client = #{client_id := ClientId}, + Channel = #channel{client_info = #{client_id := ClientId}, timers = #{stats_timer := TRef}}) -> ok = emqx_cm:set_chan_stats(ClientId, Stats), {ok, clean_timer(stats_timer, Channel)}; @@ -810,7 +810,7 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. -interval(stats_timer, #channel{client = #{zone := Zone}}) -> +interval(stats_timer, #channel{client_info = #{zone := Zone}}) -> emqx_zone:get_env(Zone, idle_timeout, 30000); interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); @@ -834,12 +834,12 @@ will_delay_interval(WillMsg) -> terminate(_, #channel{connected = undefined}) -> ok; -terminate(normal, #channel{conninfo = ConnInfo, client = ClientInfo}) -> +terminate(normal, #channel{conninfo = ConnInfo, client_info = ClientInfo}) -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); -terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client = ClientInfo}) +terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client_info = ClientInfo}) when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]); -terminate(Reason, #channel{conninfo = ConnInfo, client = ClientInfo, will_msg = WillMsg}) -> +terminate(Reason, #channel{conninfo = ConnInfo, client_info = ClientInfo, will_msg = WillMsg}) -> publish_will_msg(WillMsg), ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). @@ -866,7 +866,7 @@ enrich_conninfo(#mqtt_packet_connect{ properties = ConnProps, client_id = ClientId, username = Username}, Channel) -> - #channel{conninfo = ConnInfo, client = #{zone := Zone}} = Channel, + #channel{conninfo = ConnInfo, client_info = #{zone := Zone}} = Channel, MaxInflight = emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)), Interval = if ProtoVer == ?MQTT_PROTO_V5 -> @@ -889,18 +889,18 @@ enrich_conninfo(#mqtt_packet_connect{ {ok, Channel#channel{conninfo = NConnInfo}}. %% @doc Check connect packet. -check_connect(ConnPkt, #channel{client = #{zone := Zone}}) -> +check_connect(ConnPkt, #channel{client_info = #{zone := Zone}}) -> emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)). %% @doc Enrich client -enrich_client(ConnPkt, Channel = #channel{client = ClientInfo}) -> +enrich_client(ConnPkt, Channel = #channel{client_info = ClientInfo}) -> {ok, NConnPkt, NClientInfo} = pipeline([fun set_username/2, fun set_bridge_mode/2, fun maybe_username_as_clientid/2, fun maybe_assign_clientid/2, fun fix_mountpoint/2], ConnPkt, ClientInfo), - {ok, NConnPkt, Channel#channel{client = NClientInfo}}. + {ok, NConnPkt, Channel#channel{client_info = NClientInfo}}. set_username(#mqtt_packet_connect{username = Username}, ClientInfo = #{username := undefined}) -> @@ -931,20 +931,20 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := Mountpoint}) -> {ok, ClientInfo#{mountpoint := emqx_mountpoint:replvar(Mountpoint, ClientInfo)}}. %% @doc Set logger metadata. -set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> +set_logger_meta(_ConnPkt, #channel{client_info = #{client_id := ClientId}}) -> emqx_logger:set_metadata_client_id(ClientId). %%-------------------------------------------------------------------- %% Check banned/flapping %%-------------------------------------------------------------------- -check_banned(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) -> +check_banned(_ConnPkt, #channel{client_info = ClientInfo = #{zone := Zone}}) -> case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of true -> {error, ?RC_BANNED}; false -> ok end. -check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) -> +check_flapping(_ConnPkt, #channel{client_info = ClientInfo = #{zone := Zone}}) -> case emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:check(ClientInfo) of true -> {error, ?RC_CONNECTION_RATE_EXCEEDED}; @@ -958,10 +958,10 @@ check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) -> auth_connect(#mqtt_packet_connect{client_id = ClientId, username = Username, password = Password}, - Channel = #channel{client = ClientInfo}) -> + Channel = #channel{client_info = ClientInfo}) -> case emqx_access_control:authenticate(ClientInfo#{password => Password}) of {ok, AuthResult} -> - {ok, Channel#channel{client = maps:merge(ClientInfo, AuthResult)}}; + {ok, Channel#channel{client_info = maps:merge(ClientInfo, AuthResult)}}; {error, Reason} -> ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p", [ClientId, Username, Reason]), @@ -1004,7 +1004,7 @@ save_alias(AliasId, Topic, Aliases) -> maps:put(AliasId, Topic, Aliases). %% Check Pub ACL check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, - #channel{client = ClientInfo}) -> + #channel{client_info = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, publish, Topic) of false -> ok; @@ -1033,7 +1033,7 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain } }, - #channel{client = #{zone := Zone}}) -> + #channel{client_info = #{zone := Zone}}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). %% Check Sub @@ -1044,7 +1044,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) -> end. %% Check Sub ACL -check_sub_acl(TopicFilter, #channel{client = ClientInfo}) -> +check_sub_acl(TopicFilter, #channel{client_info = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of false -> allow; @@ -1052,7 +1052,7 @@ check_sub_acl(TopicFilter, #channel{client = ClientInfo}) -> end. %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{client = #{zone := Zone}}) -> +check_sub_caps(TopicFilter, SubOpts, #channel{client_info = #{zone := Zone}}) -> emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> @@ -1063,12 +1063,12 @@ enrich_subid(_Properties, TopicFilters) -> enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> SubOpts; -enrich_subopts(SubOpts, #channel{client = #{zone := Zone, is_bridge := IsBridge}}) -> +enrich_subopts(SubOpts, #channel{client_info = #{zone := Zone, is_bridge := IsBridge}}) -> NL = flag(emqx_zone:ignore_loop_deliver(Zone)), SubOpts#{rap => flag(IsBridge), nl => NL}. enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}, - client = #{zone := Zone}}) -> + client_info = #{zone := Zone}}) -> #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, retain_available := Retain, @@ -1087,14 +1087,14 @@ enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}, enrich_caps(AckProps, _Channel) -> AckProps. -enrich_server_keepalive(AckProps, #channel{client = #{zone := Zone}}) -> +enrich_server_keepalive(AckProps, #channel{client_info = #{zone := Zone}}) -> case emqx_zone:server_keepalive(Zone) of undefined -> AckProps; Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, - client = #{client_id := ClientId} + client_info = #{client_id := ClientId} }) -> case maps:get(client_id, ConnInfo) of <<>> -> %% Original ClientId is null. @@ -1117,7 +1117,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel). ensure_keepalive_timer(0, Channel) -> Channel; -ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) -> +ensure_keepalive_timer(Interval, Channel = #channel{client_info = #{zone := Zone}}) -> Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4da131269..80d44f4fc 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -162,7 +162,7 @@ set_chan_stats(ClientId, ChanPid, Stats) -> ok. %% @doc Open a session. --spec(open_session(boolean(), emqx_types:client(), map()) +-spec(open_session(boolean(), emqx_types:client_info(), emqx_types:conninfo()) -> {ok, #{session := emqx_session:session(), present := boolean(), pendings => list()}} diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index ca0e411a0..c2d2f97e2 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -69,7 +69,7 @@ start_link() -> stop() -> gen_server:stop(?MODULE). %% @doc Check flapping when a MQTT client connected. --spec(check(emqx_types:client()) -> boolean()). +-spec(check(emqx_types:client_info()) -> boolean()). check(#{client_id := ClientId}) -> check(ClientId, get_policy()). @@ -81,7 +81,7 @@ check(ClientId, #{banned_interval := Interval}) -> end. %% @doc Detect flapping when a MQTT client disconnected. --spec(detect(emqx_types:client()) -> boolean()). +-spec(detect(emqx_types:client_info()) -> boolean()). detect(Client) -> detect(Client, get_policy()). detect(#{client_id := ClientId, peerhost := PeerHost}, diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 36e4f67ff..14c8d9999 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -61,7 +61,7 @@ all_rules() -> %%-------------------------------------------------------------------- %% @doc Check ACL --spec(check_acl(emqx_types:client(), emqx_types:pubsub(), emqx_topic:topic(), +-spec(check_acl(emqx_types:client_info(), emqx_types:pubsub(), emqx_topic:topic(), emqx_access_rule:acl_result(), acl_rules()) -> {ok, allow} | {ok, deny} | ok). check_acl(Client, PubSub, Topic, _AclResult, Rules) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 6b7663b75..f1509137f 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -240,7 +240,7 @@ validate_topic_filters(TopicFilters) -> end, TopicFilters). %% @doc Publish Packet to Message. --spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()). +-spec(to_message(emqx_types:client_info(), emqx_ypes:packet()) -> emqx_types:message()). to_message(#{client_id := ClientId, username := Username, peerhost := PeerHost}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 434f5f5ed..7b3de773c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -140,7 +140,7 @@ %%-------------------------------------------------------------------- %% @doc Init a session. --spec(init(emqx_types:client(), Options :: map()) -> session()). +-spec(init(emqx_types:client_info(), emqx_types:conninfo()) -> session()). init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), subscriptions = #{}, @@ -252,14 +252,14 @@ redeliver(Session = #session{inflight = Inflight}) -> %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- --spec(subscribe(emqx_types:client(), emqx_types:topic(), emqx_types:subopts(), session()) +-spec(subscribe(emqx_types:client_info(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -subscribe(Client, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> +subscribe(ClientInfo, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> case is_subscriptions_full(Session) andalso (not maps:is_key(TopicFilter, Subs)) of true -> {error, ?RC_QUOTA_EXCEEDED}; false -> - do_subscribe(Client, TopicFilter, SubOpts, Session) + do_subscribe(ClientInfo, TopicFilter, SubOpts, Session) end. is_subscriptions_full(#session{max_subscriptions = 0}) -> @@ -285,13 +285,13 @@ do_subscribe(Client = #{client_id := ClientId}, TopicFilter, SubOpts, %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- --spec(unsubscribe(emqx_types:client(), emqx_types:topic(), session()) +-spec(unsubscribe(emqx_types:client_info(), emqx_types:topic(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -unsubscribe(Client, TopicFilter, Session = #session{subscriptions = Subs}) -> +unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), - ok = emqx_hooks:run('session.unsubscribed', [Client, TopicFilter, SubOpts]), + ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, SubOpts]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 27f3c7b2e..b4e1520bf 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -32,7 +32,7 @@ ]). -export_type([ conninfo/0 - , client/0 + , client_info/0 , client_id/0 , username/0 , password/0 @@ -79,7 +79,6 @@ -export_type([ caps/0 , infos/0 - , attrs/0 , stats/0 ]). @@ -96,27 +95,29 @@ -type(topic() :: emqx_topic:topic()). -type(subid() :: binary() | atom()). --type(conninfo() :: #{peername := peername(), +-type(socktype() :: tcp | udp | ssl | proxy | atom()). +-type(conninfo() :: #{socktype := socktype(), + peername := peername(), sockname := peername(), peercert := esockd_peercert:peercert(), conn_mod := module(), atom() => term() }). --type(client() :: #{zone := zone(), - protocol := protocol(), - peerhost := peerhost(), - client_id := client_id(), - username := username(), - peercert := esockd_peercert:peercert(), - is_bridge := boolean(), - is_superuser := boolean(), - mountpoint := maybe(binary()), - ws_cookie := maybe(list()), - password => maybe(binary()), - auth_result => auth_result(), - anonymous => boolean(), - atom() => term() - }). +-type(client_info() :: #{zone := zone(), + protocol := protocol(), + peerhost := peerhost(), + client_id := client_id(), + username := username(), + peercert := esockd_peercert:peercert(), + is_bridge := boolean(), + is_superuser := boolean(), + mountpoint := maybe(binary()), + ws_cookie := maybe(list()), + password => maybe(binary()), + auth_result => auth_result(), + anonymous => boolean(), + atom() => term() + }). -type(client_id() :: binary()|atom()). -type(username() :: maybe(binary())). -type(password() :: maybe(binary())). @@ -167,6 +168,5 @@ -type(caps() :: emqx_mqtt_caps:caps()). -type(infos() :: #{atom() => term()}). --type(attrs() :: #{atom() => term()}). --type(stats() :: list({atom(), non_neg_integer()})). +-type(stats() :: #{atom() => non_neg_integer()|stats()}).