feat(exproto): keeping client information up to date
This commit is contained in:
parent
f23134d52a
commit
db26956f3e
|
@ -260,7 +260,8 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
{ok, reset_timer(alive_timer, NChannel)};
|
{ok, reset_timer(alive_timer, NChannel)};
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
Req = #{type => 'KEEPALIVE'},
|
Req = #{type => 'KEEPALIVE'},
|
||||||
{ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
|
NChannel = clean_timer(alive_timer, Channel),
|
||||||
|
{ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||||
|
@ -327,7 +328,7 @@ handle_call({start_timer, keepalive, Interval},
|
||||||
NConnInfo = ConnInfo#{keepalive => Interval},
|
NConnInfo = ConnInfo#{keepalive => Interval},
|
||||||
NClientInfo = ClientInfo#{keepalive => Interval},
|
NClientInfo = ClientInfo#{keepalive => Interval},
|
||||||
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
||||||
{reply, ok, ensure_keepalive(NChannel)};
|
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)};
|
||||||
|
|
||||||
handle_call({subscribe, TopicFilter, Qos},
|
handle_call({subscribe, TopicFilter, Qos},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
@ -339,13 +340,13 @@ handle_call({subscribe, TopicFilter, Qos},
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
||||||
{reply, ok, NChannel}
|
{reply, ok, [{event, updated}], NChannel}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({unsubscribe, TopicFilter},
|
handle_call({unsubscribe, TopicFilter},
|
||||||
Channel = #channel{conn_state = connected}) ->
|
Channel = #channel{conn_state = connected}) ->
|
||||||
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
||||||
{reply, ok, NChannel};
|
{reply, ok, [{event, updated}], NChannel};
|
||||||
|
|
||||||
handle_call({publish, Topic, Qos, Payload},
|
handle_call({publish, Topic, Qos, Payload},
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
|
|
|
@ -450,11 +450,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
||||||
emqx_cm:connection_closed(ClientId),
|
emqx_cm:connection_closed(ClientId),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
%handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||||
% ClientId = emqx_exproto_channel:info(clientid, Channel),
|
ClientId = emqx_exproto_channel:info(clientid, Channel),
|
||||||
% emqx_cm:set_chan_info(ClientId, info(State)),
|
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||||
% emqx_cm:set_chan_stats(ClientId, stats(State)),
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
% {ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||||
handle_timeout(TRef, TMsg, State);
|
handle_timeout(TRef, TMsg, State);
|
||||||
|
|
Loading…
Reference in New Issue