test: fix tests after counter changes
This commit is contained in:
parent
357e5919ce
commit
f0ff32c031
|
@ -540,6 +540,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
|
|
||||||
%% stop the listener 1883 to make the bridge disconnected
|
%% stop the listener 1883 to make the bridge disconnected
|
||||||
ok = emqx_listeners:stop_listener('tcp:default'),
|
ok = emqx_listeners:stop_listener('tcp:default'),
|
||||||
|
ct:sleep(1500),
|
||||||
|
|
||||||
%% PUBLISH 2 messages to the 'local' broker, the message should
|
%% PUBLISH 2 messages to the 'local' broker, the message should
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
@ -551,7 +552,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
#{
|
#{
|
||||||
<<"status">> := Status,
|
<<"status">> := Status,
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
|
<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
|
||||||
}
|
}
|
||||||
} when Status == <<"connected">> orelse Status == <<"connecting">>,
|
} when Status == <<"connected">> orelse Status == <<"connecting">>,
|
||||||
jsx:decode(BridgeStr1)
|
jsx:decode(BridgeStr1)
|
||||||
|
|
|
@ -100,6 +100,15 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
end;
|
end;
|
||||||
|
on_query(_InstId, get_incorrect_status_count, #{pid := Pid}) ->
|
||||||
|
ReqRef = make_ref(),
|
||||||
|
From = {self(), ReqRef},
|
||||||
|
Pid ! {From, get_incorrect_status_count},
|
||||||
|
receive
|
||||||
|
{ReqRef, Count} -> {ok, Count}
|
||||||
|
after 1000 ->
|
||||||
|
{error, timeout}
|
||||||
|
end;
|
||||||
on_query(_InstId, get_counter, #{pid := Pid}) ->
|
on_query(_InstId, get_counter, #{pid := Pid}) ->
|
||||||
ReqRef = make_ref(),
|
ReqRef = make_ref(),
|
||||||
From = {self(), ReqRef},
|
From = {self(), ReqRef},
|
||||||
|
@ -157,9 +166,15 @@ spawn_counter_process(Name, Register) ->
|
||||||
Pid.
|
Pid.
|
||||||
|
|
||||||
counter_loop() ->
|
counter_loop() ->
|
||||||
counter_loop(#{counter => 0, status => running}).
|
counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}).
|
||||||
|
|
||||||
counter_loop(#{counter := Num, status := Status} = State) ->
|
counter_loop(
|
||||||
|
#{
|
||||||
|
counter := Num,
|
||||||
|
status := Status,
|
||||||
|
incorrect_status_count := IncorrectCount
|
||||||
|
} = State
|
||||||
|
) ->
|
||||||
NewState =
|
NewState =
|
||||||
receive
|
receive
|
||||||
block ->
|
block ->
|
||||||
|
@ -179,10 +194,13 @@ counter_loop(#{counter := Num, status := Status} = State) ->
|
||||||
State#{counter => Num + N};
|
State#{counter => Num + N};
|
||||||
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
|
{{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
|
||||||
FromPid ! {ReqRef, incorrect_status},
|
FromPid ! {ReqRef, incorrect_status},
|
||||||
State;
|
State#{incorrect_status_count := IncorrectCount + 1};
|
||||||
{get, ReplyFun} ->
|
{get, ReplyFun} ->
|
||||||
apply_reply(ReplyFun, Num),
|
apply_reply(ReplyFun, Num),
|
||||||
State;
|
State;
|
||||||
|
{{FromPid, ReqRef}, get_incorrect_status_count} ->
|
||||||
|
FromPid ! {ReqRef, IncorrectCount},
|
||||||
|
State;
|
||||||
{{FromPid, ReqRef}, get} ->
|
{{FromPid, ReqRef}, get} ->
|
||||||
FromPid ! {ReqRef, Num},
|
FromPid ! {ReqRef, Num},
|
||||||
State
|
State
|
||||||
|
|
|
@ -420,10 +420,18 @@ t_query_counter_async_inflight(_) ->
|
||||||
|
|
||||||
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
|
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
|
||||||
ct:pal("metrics: ~p", [C]),
|
ct:pal("metrics: ~p", [C]),
|
||||||
|
{ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count),
|
||||||
|
%% The `simple_sync_query' we just did also increases the matched
|
||||||
|
%% count, hence the + 1.
|
||||||
|
ExtraSimpleCallCount = IncorrectStatusCount + 1,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
|
#{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when
|
||||||
M == Ss + Dp - Rs,
|
M == Ss + Dp - Rs + ExtraSimpleCallCount,
|
||||||
C
|
C,
|
||||||
|
#{
|
||||||
|
metrics => C,
|
||||||
|
extra_simple_call_count => ExtraSimpleCallCount
|
||||||
|
}
|
||||||
),
|
),
|
||||||
?assert(
|
?assert(
|
||||||
lists:all(
|
lists:all(
|
||||||
|
|
Loading…
Reference in New Issue