Merge pull request #11520 from zhongwencool/packet-connack-sent-count-error
This commit is contained in:
commit
b59ec09266
|
@ -2218,6 +2218,7 @@ disconnect_and_shutdown(Reason, Reply, Channel) ->
|
||||||
NChannel = ensure_disconnected(Reason, Channel),
|
NChannel = ensure_disconnected(Reason, Channel),
|
||||||
shutdown(Reason, Reply, NChannel).
|
shutdown(Reason, Reply, NChannel).
|
||||||
|
|
||||||
|
-compile({inline, [sp/1, flag/1]}).
|
||||||
sp(true) -> 1;
|
sp(true) -> 1;
|
||||||
sp(false) -> 0.
|
sp(false) -> 0.
|
||||||
|
|
||||||
|
|
|
@ -35,8 +35,6 @@
|
||||||
insert_channel_info/3
|
insert_channel_info/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([connection_closed/1]).
|
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
get_chan_info/1,
|
get_chan_info/1,
|
||||||
get_chan_info/2,
|
get_chan_info/2,
|
||||||
|
@ -192,14 +190,6 @@ do_unregister_channel({_ClientId, ChanPid} = Chan) ->
|
||||||
ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
|
ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
|
||||||
true.
|
true.
|
||||||
|
|
||||||
-spec connection_closed(emqx_types:clientid()) -> true.
|
|
||||||
connection_closed(ClientId) ->
|
|
||||||
connection_closed(ClientId, self()).
|
|
||||||
|
|
||||||
-spec connection_closed(emqx_types:clientid(), chan_pid()) -> true.
|
|
||||||
connection_closed(ClientId, ChanPid) ->
|
|
||||||
ets:delete_object(?CHAN_CONN_TAB, {ClientId, ChanPid}).
|
|
||||||
|
|
||||||
%% @doc Get info of a channel.
|
%% @doc Get info of a channel.
|
||||||
-spec get_chan_info(emqx_types:clientid()) -> maybe(emqx_types:infos()).
|
-spec get_chan_info(emqx_types:clientid()) -> maybe(emqx_types:infos()).
|
||||||
get_chan_info(ClientId) ->
|
get_chan_info(ClientId) ->
|
||||||
|
|
|
@ -636,7 +636,6 @@ handle_msg(
|
||||||
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
emqx_cm:set_chan_info(ClientId, info(State)),
|
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||||
emqx_cm:connection_closed(ClientId),
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
|
|
@ -493,7 +493,7 @@ inc_sent(Packet) ->
|
||||||
inc('packets.sent'),
|
inc('packets.sent'),
|
||||||
do_inc_sent(Packet).
|
do_inc_sent(Packet).
|
||||||
|
|
||||||
do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
|
do_inc_sent(?CONNACK_PACKET(ReasonCode, _SessPresent)) ->
|
||||||
(ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'),
|
(ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'),
|
||||||
((ReasonCode == ?RC_NOT_AUTHORIZED) orelse
|
((ReasonCode == ?RC_NOT_AUTHORIZED) orelse
|
||||||
(ReasonCode == ?CONNACK_AUTH)) andalso
|
(ReasonCode == ?CONNACK_AUTH)) andalso
|
||||||
|
|
|
@ -531,7 +531,6 @@ handle_info({event, connected}, State = #state{channel = Channel}) ->
|
||||||
handle_info({event, disconnected}, State = #state{channel = Channel}) ->
|
handle_info({event, disconnected}, State = #state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
emqx_cm:set_chan_info(ClientId, info(State)),
|
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||||
emqx_cm:connection_closed(ClientId),
|
|
||||||
return(State);
|
return(State);
|
||||||
handle_info({event, _Other}, State = #state{channel = Channel}) ->
|
handle_info({event, _Other}, State = #state{channel = Channel}) ->
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
|
|
@ -274,7 +274,6 @@ t_handle_msg_event(_) ->
|
||||||
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> ok end),
|
|
||||||
?assertEqual(ok, handle_msg({event, connected}, st())),
|
?assertEqual(ok, handle_msg({event, connected}, st())),
|
||||||
?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
|
?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
|
||||||
?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
|
?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
|
||||||
|
|
|
@ -122,6 +122,17 @@ t_inc_sent(_) ->
|
||||||
with_metrics_server(
|
with_metrics_server(
|
||||||
fun() ->
|
fun() ->
|
||||||
ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
|
ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0)),
|
||||||
|
ok = emqx_metrics:inc_sent(?CONNACK_PACKET(0, 1)),
|
||||||
|
ok = emqx_metrics:inc_sent(
|
||||||
|
?CONNACK_PACKET(0, 1, #{
|
||||||
|
'Maximum-Packet-Size' => 1048576,
|
||||||
|
'Retain-Available' => 1,
|
||||||
|
'Shared-Subscription-Available' => 1,
|
||||||
|
'Subscription-Identifier-Available' => 1,
|
||||||
|
'Topic-Alias-Maximum' => 65535,
|
||||||
|
'Wildcard-Subscription-Available' => 1
|
||||||
|
})
|
||||||
|
),
|
||||||
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(0, 0)),
|
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(0, 0)),
|
||||||
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(1, 0)),
|
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(1, 0)),
|
||||||
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(2, 0)),
|
ok = emqx_metrics:inc_sent(?PUBLISH_PACKET(2, 0)),
|
||||||
|
@ -134,8 +145,8 @@ t_inc_sent(_) ->
|
||||||
ok = emqx_metrics:inc_sent(?PACKET(?PINGRESP)),
|
ok = emqx_metrics:inc_sent(?PACKET(?PINGRESP)),
|
||||||
ok = emqx_metrics:inc_sent(?PACKET(?DISCONNECT)),
|
ok = emqx_metrics:inc_sent(?PACKET(?DISCONNECT)),
|
||||||
ok = emqx_metrics:inc_sent(?PACKET(?AUTH)),
|
ok = emqx_metrics:inc_sent(?PACKET(?AUTH)),
|
||||||
?assertEqual(13, emqx_metrics:val('packets.sent')),
|
?assertEqual(15, emqx_metrics:val('packets.sent')),
|
||||||
?assertEqual(1, emqx_metrics:val('packets.connack.sent')),
|
?assertEqual(3, emqx_metrics:val('packets.connack.sent')),
|
||||||
?assertEqual(3, emqx_metrics:val('messages.sent')),
|
?assertEqual(3, emqx_metrics:val('messages.sent')),
|
||||||
?assertEqual(1, emqx_metrics:val('messages.qos0.sent')),
|
?assertEqual(1, emqx_metrics:val('messages.qos0.sent')),
|
||||||
?assertEqual(1, emqx_metrics:val('messages.qos1.sent')),
|
?assertEqual(1, emqx_metrics:val('messages.qos1.sent')),
|
||||||
|
|
|
@ -483,7 +483,6 @@ t_handle_info_close(_) ->
|
||||||
t_handle_info_event(_) ->
|
t_handle_info_event(_) ->
|
||||||
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
|
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end),
|
|
||||||
{ok, _} = ?ws_conn:handle_info({event, connected}, st()),
|
{ok, _} = ?ws_conn:handle_info({event, connected}, st()),
|
||||||
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
|
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
|
||||||
{ok, _} = ?ws_conn:handle_info({event, updated}, st()).
|
{ok, _} = ?ws_conn:handle_info({event, updated}, st()).
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed issue where packets_connack_sent metric was not incremented on CONNACK packets sent with non-zero ack_flag
|
Loading…
Reference in New Issue