diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index dce3b36e1..80e5f4045 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -260,7 +260,8 @@ handle_timeout(_TRef, {keepalive, StatVal}, {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)} + NChannel = clean_timer(alive_timer, Channel), + {ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> @@ -327,7 +328,7 @@ handle_call({start_timer, keepalive, Interval}, NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], ensure_keepalive(NChannel)}; handle_call({subscribe, TopicFilter, Qos}, Channel = #channel{ @@ -339,13 +340,13 @@ handle_call({subscribe, TopicFilter, Qos}, {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), - {reply, ok, NChannel} + {reply, ok, [{event, updated}], NChannel} end; handle_call({unsubscribe, TopicFilter}, Channel = #channel{conn_state = connected}) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, updated}], NChannel}; handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 0e83e96c4..5c56731d6 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -450,11 +450,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) -> emqx_cm:connection_closed(ClientId), {ok, State}; -%handle_msg({event, _Other}, State = #state{channel = Channel}) -> -% ClientId = emqx_exproto_channel:info(clientid, Channel), -% emqx_cm:set_chan_info(ClientId, info(State)), -% emqx_cm:set_chan_stats(ClientId, stats(State)), -% {ok, State}; +handle_msg({event, _Other}, State = #state{channel = Channel}) -> + ClientId = emqx_exproto_channel:info(clientid, Channel), + emqx_cm:set_chan_info(ClientId, info(State)), + emqx_cm:set_chan_stats(ClientId, stats(State)), + {ok, State}; handle_msg({timeout, TRef, TMsg}, State) -> handle_timeout(TRef, TMsg, State);