From b7ca3905a6cc62b29c2501b31fdc7329f4c3f661 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 27 Dec 2019 16:31:06 +0800 Subject: [PATCH 1/7] Breaking Change: Add new hooks for client and session lifecircle (#3138) --- src/emqx_access_control.erl | 8 ++- src/emqx_broker.erl | 6 +- src/emqx_channel.erl | 114 +++++++++++++++++---------------- src/emqx_cm.erl | 11 +++- src/emqx_mod_presence.erl | 69 +++++++++++--------- src/emqx_mod_subscription.erl | 9 ++- src/emqx_session.erl | 16 ++++- test/emqx_connection_SUITE.erl | 6 +- test/emqx_session_SUITE.erl | 3 +- 9 files changed, 139 insertions(+), 103 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index df21afa0e..96babb951 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -24,6 +24,8 @@ , reload_acl/0 ]). +-import(emqx_hooks, [run_fold/3]). + -type(result() :: #{auth_result := emqx_types:auth_result(), anonymous := boolean() }). @@ -34,9 +36,9 @@ -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). authenticate(ClientInfo = #{zone := Zone}) -> - case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of + case run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of Result = #{auth_result := success} -> - {ok, Result}; + {ok, Result}; Result -> {error, maps:get(auth_result, Result, unknown_error)} end. @@ -61,7 +63,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) -> do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> Default = emqx_zone:get_env(Zone, acl_nomatch, deny), - case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of + case run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of allow -> allow; _Other -> deny end. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 850b466d3..a5ab536da 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -230,7 +230,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}. -spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). route([], #delivery{message = Msg}) -> - emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), ok = inc_dropped_cnt(Msg), []; @@ -282,8 +282,8 @@ forward(Node, To, Delivery, sync) -> -spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of - [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - _ = inc_dropped_cnt(Topic), + [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), + ok = inc_dropped_cnt(Topic), {error, no_subscribers}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg); diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 7d2bf8f23..d68f1ed37 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -45,7 +45,7 @@ , terminate/2 ]). -%% export for ct +%% Exports for CT -export([set_field/3]). -import(emqx_misc, @@ -204,9 +204,10 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) -> handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> case pipeline([fun enrich_conninfo/2, + fun run_conn_hooks/2, fun check_connect/2, fun enrich_client/2, - fun set_logger_meta/2, + fun set_log_meta/2, fun check_banned/2, fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of @@ -549,26 +550,25 @@ not_nacked({deliver, _Topic, Msg}) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}). -handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) -> +handle_out(connack, {RC = ?RC_SUCCESS, SP, ConnPkt}, + Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 ], #{}, Channel), - AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), - return_connack(AckPacket, + AckPacket = ?CONNACK_PACKET(RC, SP, AckProps), + AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), + return_connack(AckPacket1, ensure_keepalive(AckProps, ensure_connected(ConnPkt, Channel))); -handle_out(connack, {ReasonCode, _ConnPkt}, - Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]), - AckPacket = ?CONNACK_PACKET( - case maps:get(proto_ver, ConnInfo) of - ?MQTT_PROTO_V5 -> ReasonCode; - _Other -> emqx_reason_codes:compat(connack, ReasonCode) - end), - shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel); +handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> + AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of + ?MQTT_PROTO_V5 -> ReasonCode; + _ -> emqx_reason_codes:compat(connack, ReasonCode) + end), + AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), + shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel); %% Optimize? handle_out(publish, [], Channel) -> @@ -625,10 +625,7 @@ handle_out(Type, Data, Channel) -> %% Return ConnAck %%-------------------------------------------------------------------- -return_connack(AckPacket, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo - }) -> - ok = emqx_hooks:run('client.connected', [ClientInfo, ?RC_SUCCESS, ConnInfo]), +return_connack(AckPacket, Channel) -> Replies = [{event, connected}, {connack, AckPacket}], case maybe_resume_session(Channel) of ignore -> {ok, Replies, Channel}; @@ -754,13 +751,12 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(Reason, Channel); -handle_info({sock_closed, Reason}, - Channel = #channel{conn_state = connected, - clientinfo = ClientInfo = #{zone := Zone}}) -> +handle_info({sock_closed, Reason}, Channel = + #channel{conn_state = connected, + clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected( - mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -879,19 +875,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) -> %% Terminate %%-------------------------------------------------------------------- -terminate(_, #channel{conn_state = idle}) -> - ok; -terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> - ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); -terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) - when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered -> - ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]); -terminate(Reason, #channel{conninfo = ConnInfo, clientinfo = ClientInfo, will_msg = WillMsg}) -> - case WillMsg of - undefined -> ok; - _ -> publish_will_msg(WillMsg) - end, - ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). +terminate(_, #channel{conn_state = idle}) -> ok; +terminate(normal, Channel) -> + run_terminate_hook(normal, Channel); +terminate({shutdown, Reason}, Channel) + when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered -> + run_terminate_hook(Reason, Channel); +terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + run_terminate_hook(Reason, Channel). + +run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; +run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) -> + emqx_session:terminate(ClientInfo, Reason, Session). %%-------------------------------------------------------------------- %% Internal functions @@ -940,6 +936,15 @@ expiry_interval(_ClientInfo, #mqtt_packet_connect{clean_start = true}) -> receive_maximum(#{zone := Zone}, ConnProps) -> emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)). +%%-------------------------------------------------------------------- +%% Run Connect Hooks + +run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) -> + case emqx_hooks:run_fold('client.connect', [ConnInfo], ConnPkt) of + Error = {error, _Reason} -> Error; + NConnPkt -> {ok, NConnPkt, Channel} + end. + %%-------------------------------------------------------------------- %% Check Connect Packet @@ -987,9 +992,9 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) -> {ok, ClientInfo#{mountpoint := MountPoint1}}. %%-------------------------------------------------------------------- -%% Set logger metadata +%% Set log metadata -set_logger_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) -> +set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) -> emqx_logger:set_metadata_clientid(ClientId). %%-------------------------------------------------------------------- @@ -1172,6 +1177,19 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, _Origin -> AckProps end. +%%-------------------------------------------------------------------- +%% Ensure connected + +ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, + ok = emqx_hooks:run('client.connected', [ClientInfo, NConnInfo]), + Channel#channel{conninfo = NConnInfo, + conn_state = connected, + will_msg = emqx_packet:will_msg(ConnPkt), + alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) + }. + %%-------------------------------------------------------------------- %% Init Alias Maximum @@ -1183,20 +1201,6 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, }; init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. -%%-------------------------------------------------------------------- -%% Ensure connected - -ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - WillMsg = emqx_packet:will_msg(ConnPkt), - AliasMaximum = init_alias_maximum(ConnPkt, ClientInfo), - NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, - Channel#channel{conninfo = NConnInfo, - will_msg = WillMsg, - conn_state = connected, - alias_maximum = AliasMaximum - }. - %%-------------------------------------------------------------------- %% Enrich Keepalive @@ -1255,8 +1259,10 @@ parse_topic_filters(TopicFilters) -> %%-------------------------------------------------------------------- %% Ensure disconnected -ensure_disconnected(Channel = #channel{conninfo = ConnInfo}) -> +ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)}, + ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 0a22ce926..9404835c8 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -206,7 +206,7 @@ set_chan_stats(ClientId, ChanPid, Stats) -> open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> CleanStart = fun(_) -> ok = discard_session(ClientId), - Session = emqx_session:init(ClientInfo, ConnInfo), + Session = create_session(ClientInfo, ConnInfo), {ok, #{session => Session, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); @@ -215,18 +215,23 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> ResumeStart = fun(_) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientId, Session), + ok = emqx_session:resume(ClientInfo, Session), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), {ok, #{session => Session, present => true, pendings => Pendings}}; {error, not_found} -> - Session = emqx_session:init(ClientInfo, ConnInfo), + Session = create_session(ClientInfo, ConnInfo), {ok, #{session => Session, present => false}} end end, emqx_cm_locker:trans(ClientId, ResumeStart). +create_session(ClientInfo, ConnInfo) -> + Session = emqx_session:init(ClientInfo, ConnInfo), + ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), + Session. + %% @doc Try to takeover a session. -spec(takeover_session(emqx_types:clientid()) -> {ok, emqx_session:session()} | {error, Reason :: term()}). diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 1d6397075..335cc36b5 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -28,12 +28,12 @@ , unload/1 ]). --export([ on_client_connected/4 +-export([ on_client_connected/3 , on_client_disconnected/4 ]). -ifdef(TEST). --export([ reason/1 ]). +-export([reason/1]). -endif. load(Env) -> @@ -44,26 +44,12 @@ unload(_Env) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). -on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) -> - #{peerhost := PeerHost} = ClientInfo, - #{clean_start := CleanStart, - proto_name := ProtoName, - proto_ver := ProtoVer, - keepalive := Keepalive, - expiry_interval := ExpiryInterval} = ConnInfo, - ClientId = clientid(ClientInfo, ConnInfo), - Username = username(ClientInfo, ConnInfo), - Presence = #{clientid => ClientId, - username => Username, - ipaddress => ntoa(PeerHost), - proto_name => ProtoName, - proto_ver => ProtoVer, - keepalive => Keepalive, - connack => ConnAck, - clean_start => CleanStart, - expiry_interval => ExpiryInterval, - ts => erlang:system_time(millisecond) - }, +%%-------------------------------------------------------------------- +%% Callbacks +%%-------------------------------------------------------------------- + +on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) -> + Presence = connected_presence(ClientInfo, ConnInfo), case emqx_json:safe_encode(Presence) of {ok, Payload} -> emqx_broker:safe_publish( @@ -72,12 +58,12 @@ on_client_connected(ClientInfo, ConnAck, ConnInfo, Env) -> ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence]) end. -on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> - ClientId = clientid(ClientInfo, ConnInfo), - Username = username(ClientInfo, ConnInfo), +on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, + Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) -> Presence = #{clientid => ClientId, username => Username, reason => reason(Reason), + disconnected_at => DisconnectedAt, ts => erlang:system_time(millisecond) }, case emqx_json:safe_encode(Presence) of @@ -88,11 +74,35 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence]) end. -clientid(#{clientid := undefined}, #{clientid := ClientId}) -> ClientId; -clientid(#{clientid := ClientId}, _ConnInfo) -> ClientId. +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- -username(#{username := undefined}, #{username := Username}) -> Username; -username(#{username := Username}, _ConnInfo) -> Username. +connected_presence(#{peerhost := PeerHost, + sockport := SockPort, + clientid := ClientId, + username := Username + }, + #{clean_start := CleanStart, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + connected_at := ConnectedAt, + expiry_interval := ExpiryInterval + }) -> + #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + connack => 0, %% Deprecated? + clean_start => CleanStart, + expiry_interval => ExpiryInterval, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. make_msg(QoS, Topic, Payload) -> emqx_message:set_flag( @@ -106,6 +116,7 @@ topic(disconnected, ClientId) -> qos(Env) -> proplists:get_value(qos, Env, 0). +-compile({inline, [reason/1]}). reason(Reason) when is_atom(Reason) -> Reason; reason({shutdown, Reason}) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 47b9ab337..b0bfe49ba 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -27,7 +27,7 @@ ]). %% APIs --export([on_client_connected/4]). +-export([on_client_connected/3]). %%-------------------------------------------------------------------- %% Load/Unload Hook @@ -36,8 +36,7 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). -on_client_connected(#{clientid := ClientId, - username := Username}, ?RC_SUCCESS, _ConnInfo, Topics) -> +on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo, Topics) -> Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, @@ -47,9 +46,9 @@ on_client_connected(#{clientid := ClientId, unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% Internal functions -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- rep(<<"%c">>, ClientId, Topic) -> emqx_topic:feed_var(<<"%c">>, ClientId, Topic); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index f9e304aa0..793ed096c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -76,6 +76,7 @@ -export([ deliver/2 , enqueue/2 , retry/1 + , terminate/3 ]). -export([ takeover/1 @@ -604,11 +605,12 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, takeover(#session{subscriptions = Subs}) -> lists:foreach(fun emqx_broker:unsubscribe/1, maps:keys(Subs)). --spec(resume(emqx_types:clientid(), session()) -> ok). -resume(ClientId, #session{subscriptions = Subs}) -> +-spec(resume(emqx_types:clientinfo(), session()) -> ok). +resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = Subs}) -> lists:foreach(fun({TopicFilter, SubOpts}) -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) - end, maps:to_list(Subs)). + end, maps:to_list(Subs)), + emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> @@ -626,6 +628,14 @@ replay(Inflight) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)). +-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). +terminate(ClientInfo, discarded, Session) -> + emqx_hooks:run('session.discarded', [ClientInfo, info(Session)]); +terminate(ClientInfo, takeovered, Session) -> + emqx_hooks:run('session.takeovered', [ClientInfo, info(Session)]); +terminate(ClientInfo, Reason, Session) -> + emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(Session)]). + %%-------------------------------------------------------------------- %% Inc message/delivery expired counter %%-------------------------------------------------------------------- diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index d0c3f23f5..0677b5ff2 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -44,6 +44,10 @@ init_per_suite(Config) -> ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), ok = meck:expect(emqx_metrics, inc_recv, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc_sent, fun(_) -> ok end), + %% Meck Hooks + ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), + ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end), Config. end_per_suite(_Config) -> @@ -424,4 +428,4 @@ channel(InitFields) -> maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected - }, InitFields)). \ No newline at end of file + }, InitFields)). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 457afc71e..0cd395385 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -30,7 +30,6 @@ all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- init_per_suite(Config) -> - %% Broker ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker], [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), @@ -329,7 +328,7 @@ t_takeover(_) -> t_resume(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}), - ok = emqx_session:resume(<<"clientid">>, Session). + ok = emqx_session:resume(#{clientid => <<"clientid">>}, Session). t_replay(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], From 9a3d16c654712f4a6a761d5f71ed1fa1af2b5205 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 28 Dec 2019 20:06:32 +0800 Subject: [PATCH 2/7] Add 'emqx_packet:info/2' function and test cases --- src/emqx_packet.erl | 96 ++++++++++++++++++++++++++++++++++++++ test/emqx_packet_SUITE.erl | 79 +++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index b46c4638b..036e0b2ef 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -27,8 +27,10 @@ , retain/1 ]). +%% Field APIs -export([ proto_name/1 , proto_ver/1 + , info/2 ]). %% Check API @@ -95,6 +97,100 @@ proto_ver(?CONNECT_PACKET(ConnPkt)) -> proto_ver(#mqtt_packet_connect{proto_ver = Ver}) -> Ver. +%%-------------------------------------------------------------------- +%% Field Info +%%-------------------------------------------------------------------- + +info(proto_name, #mqtt_packet_connect{proto_name = Name}) -> + Name; +info(proto_ver, #mqtt_packet_connect{proto_ver = Ver}) -> + Ver; +info(is_bridge, #mqtt_packet_connect{is_bridge = IsBridge}) -> + IsBridge; +info(clean_start, #mqtt_packet_connect{clean_start = CleanStart}) -> + CleanStart; +info(will_flag, #mqtt_packet_connect{will_flag = WillFlag}) -> + WillFlag; +info(will_qos, #mqtt_packet_connect{will_qos = WillQoS}) -> + WillQoS; +info(will_retain, #mqtt_packet_connect{will_retain = WillRetain}) -> + WillRetain; +info(keepalive, #mqtt_packet_connect{keepalive = KeepAlive}) -> + KeepAlive; +info(properties, #mqtt_packet_connect{properties = Props}) -> + Props; +info(clientid, #mqtt_packet_connect{clientid = ClientId}) -> + ClientId; +info(will_props, #mqtt_packet_connect{will_props = WillProps}) -> + WillProps; +info(will_topic, #mqtt_packet_connect{will_topic = WillTopic}) -> + WillTopic; +info(will_payload, #mqtt_packet_connect{will_payload = Payload}) -> + Payload; +info(username, #mqtt_packet_connect{username = Username}) -> + Username; +info(password, #mqtt_packet_connect{password = Password}) -> + Password; + +info(ack_flags, #mqtt_packet_connack{ack_flags = Flags}) -> + Flags; +info(reason_code, #mqtt_packet_connack{reason_code = RC}) -> + RC; +info(properties, #mqtt_packet_connack{properties = Props}) -> + Props; + +info(topic_name, #mqtt_packet_publish{topic_name = Topic}) -> + Topic; +info(packet_id, #mqtt_packet_publish{packet_id = PacketId}) -> + PacketId; +info(properties, #mqtt_packet_publish{properties = Props}) -> + Props; + +info(packet_id, #mqtt_packet_puback{packet_id = PacketId}) -> + PacketId; +info(reason_code, #mqtt_packet_puback{reason_code = RC}) -> + RC; +info(properties, #mqtt_packet_puback{properties = Props}) -> + Props; + +info(packet_id, #mqtt_packet_subscribe{packet_id = PacketId}) -> + PacketId; +info(properties, #mqtt_packet_subscribe{properties = Props}) -> + Props; +info(topic_filters, #mqtt_packet_subscribe{topic_filters = Topics}) -> + Topics; + +info(packet_id, #mqtt_packet_suback{packet_id = PacketId}) -> + PacketId; +info(properties, #mqtt_packet_suback{properties = Props}) -> + Props; +info(reason_codes, #mqtt_packet_suback{reason_codes = RCs}) -> + RCs; + +info(packet_id, #mqtt_packet_unsubscribe{packet_id = PacketId}) -> + PacketId; +info(properties, #mqtt_packet_unsubscribe{properties = Props}) -> + Props; +info(topic_filters, #mqtt_packet_unsubscribe{topic_filters = Topics}) -> + Topics; + +info(packet_id, #mqtt_packet_unsuback{packet_id = PacketId}) -> + PacketId; +info(properties, #mqtt_packet_unsuback{properties = Props}) -> + Props; +info(reason_codes, #mqtt_packet_unsuback{reason_codes = RCs}) -> + RCs; + +info(reason_code, #mqtt_packet_disconnect{reason_code = RC}) -> + RC; +info(properties, #mqtt_packet_disconnect{properties = Props}) -> + Props; + +info(reason_code, #mqtt_packet_auth{reason_code = RC}) -> + RC; +info(properties, #mqtt_packet_auth{properties = Props}) -> + Props. + %%-------------------------------------------------------------------- %% Check MQTT Packet %%-------------------------------------------------------------------- diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index 99b75b40b..e47817307 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -78,6 +78,85 @@ t_proto_ver(_) -> ?assertEqual(Ver, emqx_packet:proto_ver(ConnPkt)) end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]). +t_connect_info(_) -> + ConnPkt = #mqtt_packet_connect{will_flag = true, + clientid = <<"clientid">>, + username = <<"username">>, + will_retain = true, + will_qos = ?QOS_2, + will_topic = <<"topic">>, + will_props = undefined, + will_payload = <<"payload">> + }, + ?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)), + ?assertEqual(4, emqx_packet:info(proto_ver, ConnPkt)), + ?assertEqual(false, emqx_packet:info(is_bridge, ConnPkt)), + ?assertEqual(true, emqx_packet:info(clean_start, ConnPkt)), + ?assertEqual(true, emqx_packet:info(will_flag, ConnPkt)), + ?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)), + ?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)), + ?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, ConnPkt)), + ?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)), + ?assertEqual(undefined, emqx_packet:info(will_props, ConnPkt)), + ?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)), + ?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)), + ?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)), + ?assertEqual(undefined, emqx_packet:info(password, ConnPkt)). + +t_connack_info(_) -> + AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0}, + ?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)), + ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + +t_publish_info(_) -> + PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1}, + ?assertEqual(1, emqx_packet:info(packet_id, PubPkt)), + ?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, PubPkt)). + +t_puback_info(_) -> + AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0}, + ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), + ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + +t_subscribe_info(_) -> + TopicFilters = [{<<"t/#">>, #{}}], + SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters}, + ?assertEqual(1, emqx_packet:info(packet_id, SubPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, SubPkt)), + ?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)). + +t_suback_info(_) -> + SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]}, + ?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, SubackPkt)), + ?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)). + +t_unsubscribe_info(_) -> + UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]}, + ?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, UnsubPkt)), + ?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)). + +t_unsuback_info(_) -> + AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]}, + ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), + ?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + +t_disconnect_info(_) -> + DisconnPkt = #mqtt_packet_disconnect{reason_code = 0}, + ?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, DisconnPkt)). + +t_auth_info(_) -> + AuthPkt = #mqtt_packet_auth{reason_code = 0}, + ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)), + ?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)). + t_check_publish(_) -> Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)), From 12c6d5fe2c404d039746f16beb220ae673e33178 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 28 Dec 2019 18:48:54 +0800 Subject: [PATCH 3/7] Add more metrics for client's lifecircle - client.connect - client.connack - client.connected - client.authenticate - client.check_acl - client.subscribe - client.unsubscribe - client.disconnected --- src/emqx_access_control.erl | 14 +++++---- src/emqx_channel.erl | 57 ++++++++++++++++++++++--------------- src/emqx_metrics.erl | 20 +++++++------ 3 files changed, 54 insertions(+), 37 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 96babb951..28bb30cba 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -24,8 +24,6 @@ , reload_acl/0 ]). --import(emqx_hooks, [run_fold/3]). - -type(result() :: #{auth_result := emqx_types:auth_result(), anonymous := boolean() }). @@ -36,7 +34,7 @@ -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). authenticate(ClientInfo = #{zone := Zone}) -> - case run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of + case run_hooks('client.authenticate', [ClientInfo], default_auth_result(Zone)) of Result = #{auth_result := success} -> {ok, Result}; Result -> @@ -63,7 +61,7 @@ check_acl_cache(ClientInfo, PubSub, Topic) -> do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> Default = emqx_zone:get_env(Zone, acl_nomatch, deny), - case run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of + case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of allow -> allow; _Other -> deny end. @@ -75,7 +73,11 @@ reload_acl() -> default_auth_result(Zone) -> case emqx_zone:get_env(Zone, allow_anonymous, false) of - true -> #{auth_result => success, anonymous => true}; - false -> #{auth_result => not_authorized, anonymous => false} + true -> #{auth_result => success, anonymous => true}; + false -> #{auth_result => not_authorized, anonymous => false} end. +-compile({inline, [run_hooks/3]}). +run_hooks(Name, Args, Acc) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc). + diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index d68f1ed37..9b35b42b1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -289,11 +289,13 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo}) -> case emqx_packet:check(Packet) of - ok -> TopicFilters1 = enrich_subid(Properties, parse_topic_filters(TopicFilters)), - TopicFilters2 = emqx_hooks:run_fold('client.subscribe', - [ClientInfo, Properties], - TopicFilters1), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), + ok -> TopicFilters1 = parse_topic_filters(TopicFilters), + TopicFilters2 = enrich_subid(Properties, TopicFilters1), + TopicFilters3 = run_hooks('client.subscribe', + [ClientInfo, Properties], + TopicFilters2 + ), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), handle_out(suback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) @@ -302,9 +304,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo}) -> case emqx_packet:check(Packet) of - ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [ClientInfo, Properties], - parse_topic_filters(TopicFilters)), + ok -> TopicFilters1 = run_hooks('client.unsubscribe', + [ClientInfo, Properties], + parse_topic_filters(TopicFilters) + ), {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> @@ -550,15 +553,14 @@ not_nacked({deliver, _Topic, Msg}) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}). -handle_out(connack, {RC = ?RC_SUCCESS, SP, ConnPkt}, - Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 ], #{}, Channel), - AckPacket = ?CONNACK_PACKET(RC, SP, AckProps), - AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), - return_connack(AckPacket1, + AckPacket = run_hooks('client.connack', [ConnInfo], + ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)), + return_connack(AckPacket, ensure_keepalive(AckProps, ensure_connected(ConnPkt, Channel))); @@ -567,7 +569,7 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn ?MQTT_PROTO_V5 -> ReasonCode; _ -> emqx_reason_codes:compat(connack, ReasonCode) end), - AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket), + AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket), shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel); %% Optimize? @@ -732,16 +734,18 @@ handle_call(Req, Channel) -> -spec(handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = emqx_hooks:run_fold('client.subscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters)), + TopicFilters1 = run_hooks('client.subscribe', + [ClientInfo, #{'Internal' => true}], + parse_topic_filters(TopicFilters) + ), {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> - TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [ClientInfo, #{'Internal' => true}], - parse_topic_filters(TopicFilters)), + TopicFilters1 = run_hooks('client.unsubscribe', + [ClientInfo, #{'Internal' => true}], + parse_topic_filters(TopicFilters) + ), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; @@ -940,7 +944,7 @@ receive_maximum(#{zone := Zone}, ConnProps) -> %% Run Connect Hooks run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) -> - case emqx_hooks:run_fold('client.connect', [ConnInfo], ConnPkt) of + case run_hooks('client.connect', [ConnInfo], ConnPkt) of Error = {error, _Reason} -> Error; NConnPkt -> {ok, NConnPkt, Channel} end. @@ -1183,7 +1187,7 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, - ok = emqx_hooks:run('client.connected', [ClientInfo, NConnInfo]), + ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = connected, will_msg = emqx_packet:will_msg(ConnPkt), @@ -1262,7 +1266,7 @@ parse_topic_filters(TopicFilters) -> ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)}, - ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]), + ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- @@ -1292,6 +1296,13 @@ disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). %% Helper functions %%-------------------------------------------------------------------- +-compile({inline, [run_hooks/2, run_hooks/3]}). +run_hooks(Name, Args) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + +run_hooks(Name, Args, Acc) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc). + -compile({inline, [find_alias/2, save_alias/3]}). find_alias(_AliasId, undefined) -> false; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index b6c06bcef..5758f68c3 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -157,7 +157,9 @@ %% Client Lifecircle metrics -define(CLIENT_METRICS, - [{counter, 'client.connected'}, + [{counter, 'client.connect'}, + {counter, 'client.connack'}, + {counter, 'client.connected'}, {counter, 'client.authenticate'}, {counter, 'client.auth.anonymous'}, {counter, 'client.check_acl'}, @@ -512,13 +514,15 @@ reserved_idx('delivery.dropped.qos0_msg') -> 121; reserved_idx('delivery.dropped.queue_full') -> 122; reserved_idx('delivery.dropped.expired') -> 123; -reserved_idx('client.connected') -> 200; -reserved_idx('client.authenticate') -> 201; -reserved_idx('client.auth.anonymous') -> 202; -reserved_idx('client.check_acl') -> 203; -reserved_idx('client.subscribe') -> 204; -reserved_idx('client.unsubscribe') -> 205; -reserved_idx('client.disconnected') -> 206; +reserved_idx('client.connect') -> 200; +reserved_idx('client.connack') -> 201; +reserved_idx('client.connected') -> 202; +reserved_idx('client.authenticate') -> 203; +reserved_idx('client.auth.anonymous') -> 204; +reserved_idx('client.check_acl') -> 205; +reserved_idx('client.subscribe') -> 206; +reserved_idx('client.unsubscribe') -> 207; +reserved_idx('client.disconnected') -> 208; reserved_idx('session.created') -> 220; reserved_idx('session.resumed') -> 221; From 46ca3f2f82dbef93236d817a8c1acf0ce29f4dd9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 27 Dec 2019 19:56:03 +0800 Subject: [PATCH 4/7] Add metrics for session's lifecircle - session.created - session.resumed - session.takeovered - session.discarded - session.terminated --- src/emqx_cm.erl | 1 + src/emqx_session.erl | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 9404835c8..561017b93 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -229,6 +229,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> create_session(ClientInfo, ConnInfo) -> Session = emqx_session:init(ClientInfo, ConnInfo), + ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 793ed096c..01b5f057c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -610,6 +610,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = lists:foreach(fun({TopicFilter, SubOpts}) -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) end, maps:to_list(Subs)), + ok = emqx_metrics:inc('session.resumed'), emqx_hooks:run('session.resumed', [ClientInfo, info(Session)]). -spec(replay(session()) -> {ok, replies(), session()}). @@ -630,11 +631,15 @@ replay(Inflight) -> -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). terminate(ClientInfo, discarded, Session) -> - emqx_hooks:run('session.discarded', [ClientInfo, info(Session)]); + run_hook('session.discarded', [ClientInfo, info(Session)]); terminate(ClientInfo, takeovered, Session) -> - emqx_hooks:run('session.takeovered', [ClientInfo, info(Session)]); + run_hook('session.takeovered', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> - emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(Session)]). + run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). + +-compile({inline, [run_hook/2]}). +run_hook(Name, Args) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). %%-------------------------------------------------------------------- %% Inc message/delivery expired counter From 7f807c0b113029d5cc9513a577ee7e6513e44945 Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 30 Dec 2019 11:18:26 +0800 Subject: [PATCH 5/7] Fix test cases fail --- test/emqx_connection_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 0677b5ff2..517267f0c 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -57,6 +57,7 @@ end_per_suite(_Config) -> ok = meck:unload(emqx_limiter), ok = meck:unload(emqx_pd), ok = meck:unload(emqx_metrics), + ok = meck:unload(emqx_hooks), ok. init_per_testcase(_TestCase, Config) -> From 597558fee885262ffd2ac5faaf2c663d285402ff Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 30 Dec 2019 12:00:31 +0800 Subject: [PATCH 6/7] Fix unexpected timeout --- src/emqx_channel.erl | 20 ++++++++------------ src/emqx_connection.erl | 8 ++++---- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 9b35b42b1..2dec4d98a 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -786,9 +786,8 @@ handle_info(Info, Channel) -> -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}). -handle_timeout(TRef, {keepalive, StatVal}, - Channel = #channel{keepalive = Keepalive, - timers = #{alive_timer := TRef}}) -> +handle_timeout(_TRef, {keepalive, StatVal}, + Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, @@ -797,9 +796,8 @@ handle_timeout(TRef, {keepalive, StatVal}, handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel) end; -handle_timeout(TRef, retry_delivery, - Channel = #channel{session = Session, - timers = #{retry_timer := TRef}}) -> +handle_timeout(_TRef, retry_delivery, + Channel = #channel{session = Session}) -> case emqx_session:retry(Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; @@ -811,9 +809,8 @@ handle_timeout(TRef, retry_delivery, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; -handle_timeout(TRef, expire_awaiting_rel, - Channel = #channel{session = Session, - timers = #{await_timer := TRef}}) -> +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{session = Session}) -> case emqx_session:expire(awaiting_rel, Session) of {ok, Session} -> {ok, clean_timer(await_timer, Channel#channel{session = Session})}; @@ -821,11 +818,10 @@ handle_timeout(TRef, expire_awaiting_rel, {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})} end; -handle_timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) -> +handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); -handle_timeout(TRef, will_message, Channel = #channel{will_msg = WillMsg, - timers = #{will_timer := TRef}}) -> +handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 774c0ba42..d0a923bff 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -459,17 +459,17 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Handle timeout -handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> +handle_timeout(_TRef, idle_timeout, State) -> shutdown(idle_timeout, State); -handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) -> +handle_timeout(_TRef, limit_timeout, State) -> NState = State#state{sockstate = idle, limit_timer = undefined }, handle_info(activate_socket, NState); -handle_timeout(TRef, emit_stats, State = - #state{stats_timer = TRef, channel = Channel}) -> +handle_timeout(_TRef, emit_stats, State = + #state{channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; From f33217c0491df404c33248a7e316d9fcdad8ac5e Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 31 Dec 2019 09:06:58 +0800 Subject: [PATCH 7/7] Fix unexpected timeout --- src/emqx_channel.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2dec4d98a..1be58cdb4 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -786,6 +786,9 @@ handle_info(Info, Channel) -> -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}). +handle_timeout(_TRef, {keepalive, _StatVal}, + Channel = #channel{keepalive = undefined}) -> + {ok, Channel}; handle_timeout(_TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of