diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 96babb951..28bb30cba 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -24,8 +24,6 @@ , reload_acl/0 ]). --import(emqx_hooks, [run_fold/3]). - -type(result() :: #{auth_result := emqx_types:auth_result(), anonymous := boolean() }). @@ -36,7 +34,7 @@ -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). authenticate(ClientInfo = #{zone := Zone}) -> - case run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of + case run_hooks('client.authenticate', [ClientInfo], default_auth_result(Zone)) of Result = #{auth_result := success} -> {ok, Result}; Result -> @@ -63,7 +61,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) -> do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> Default = emqx_zone:get_env(Zone, acl_nomatch, deny), - case run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of + case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of allow -> allow; _Other -> deny end. @@ -75,7 +73,11 @@ reload_acl() -> default_auth_result(Zone) -> case emqx_zone:get_env(Zone, allow_anonymous, false) of - true -> #{auth_result => success, anonymous => true}; - false -> #{auth_result => not_authorized, anonymous => false} + true -> #{auth_result => success, anonymous => true}; + false -> #{auth_result => not_authorized, anonymous => false} end. +-compile({inline, [run_hooks/3]}). +run_hooks(Name, Args, Acc) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc). + diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index d68f1ed37..9b35b42b1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -289,11 +289,13 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo}) -> case emqx_packet:check(Packet) of - ok -> TopicFilters1 = enrich_subid(Properties, parse_topic_filters(TopicFilters)), - TopicFilters2 = emqx_hooks:run_fold('client.subscribe', - [ClientInfo, Properties], - TopicFilters1), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), + ok -> TopicFilters1 = parse_topic_filters(TopicFilters), + TopicFilters2 = enrich_subid(Properties, TopicFilters1), + TopicFilters3 = run_hooks('client.subscribe', + [ClientInfo, Properties], + TopicFilters2 + ), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), handle_out(suback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) @@ -302,9 +304,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo}) -> case emqx_packet:check(Packet) of - ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [ClientInfo, Properties], - parse_topic_filters(TopicFilters)), + ok -> TopicFilters1 = run_hooks('client.unsubscribe', + [ClientInfo, Properties], + parse_topic_filters(TopicFilters) + ), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> @@ -550,15 +553,14 @@ not_nacked({deliver, _Topic, Msg}) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}). -handle_out(connack, {RC = ?RC_SUCCESS, SP, ConnPkt}, - Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 ], #{}, Channel), - AckPacket = ?CONNACK_PACKET(RC, SP, AckProps), - AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), - return_connack(AckPacket1, + AckPacket = run_hooks('client.connack', [ConnInfo], + ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)), + return_connack(AckPacket, ensure_keepalive(AckProps, ensure_connected(ConnPkt, Channel))); @@ -567,7 +569,7 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn ?MQTT_PROTO_V5 -> ReasonCode; _ -> emqx_reason_codes:compat(connack, ReasonCode) end), - AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), + AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket), shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel); %% Optimize? @@ -732,16 +734,18 @@ handle_call(Req, Channel) -> -spec(handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = emqx_hooks:run_fold('client.subscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters)), + TopicFilters1 = run_hooks('client.subscribe', + [ClientInfo, #{'Internal' => true}], + parse_topic_filters(TopicFilters) + ), {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters)), + TopicFilters1 = run_hooks('client.unsubscribe', + [ClientInfo, #{'Internal' => true}], + parse_topic_filters(TopicFilters) + ), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; @@ -940,7 +944,7 @@ receive_maximum(#{zone := Zone}, ConnProps) -> %% Run Connect Hooks run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) -> - case emqx_hooks:run_fold('client.connect', [ConnInfo], ConnPkt) of + case run_hooks('client.connect', [ConnInfo], ConnPkt) of Error = {error, _Reason} -> Error; NConnPkt -> {ok, NConnPkt, Channel} end. @@ -1183,7 +1187,7 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, - ok = emqx_hooks:run('client.connected', [ClientInfo, NConnInfo]), + ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = connected, will_msg = emqx_packet:will_msg(ConnPkt), @@ -1262,7 +1266,7 @@ parse_topic_filters(TopicFilters) -> ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)}, - ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]), + ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- @@ -1292,6 +1296,13 @@ disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). %% Helper functions %%-------------------------------------------------------------------- +-compile({inline, [run_hooks/2, run_hooks/3]}). +run_hooks(Name, Args) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + +run_hooks(Name, Args, Acc) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc). + -compile({inline, [find_alias/2, save_alias/3]}). find_alias(_AliasId, undefined) -> false; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index b6c06bcef..5758f68c3 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -157,7 +157,9 @@ %% Client Lifecircle metrics -define(CLIENT_METRICS, - [{counter, 'client.connected'}, + [{counter, 'client.connect'}, + {counter, 'client.connack'}, + {counter, 'client.connected'}, {counter, 'client.authenticate'}, {counter, 'client.auth.anonymous'}, {counter, 'client.check_acl'}, @@ -512,13 +514,15 @@ reserved_idx('delivery.dropped.qos0_msg') -> 121; reserved_idx('delivery.dropped.queue_full') -> 122; reserved_idx('delivery.dropped.expired') -> 123; -reserved_idx('client.connected') -> 200; -reserved_idx('client.authenticate') -> 201; -reserved_idx('client.auth.anonymous') -> 202; -reserved_idx('client.check_acl') -> 203; -reserved_idx('client.subscribe') -> 204; -reserved_idx('client.unsubscribe') -> 205; -reserved_idx('client.disconnected') -> 206; +reserved_idx('client.connect') -> 200; +reserved_idx('client.connack') -> 201; +reserved_idx('client.connected') -> 202; +reserved_idx('client.authenticate') -> 203; +reserved_idx('client.auth.anonymous') -> 204; +reserved_idx('client.check_acl') -> 205; +reserved_idx('client.subscribe') -> 206; +reserved_idx('client.unsubscribe') -> 207; +reserved_idx('client.disconnected') -> 208; reserved_idx('session.created') -> 220; reserved_idx('session.resumed') -> 221;