fix(live_conn): fix double-decrement for anonymous clients
`set_chan_info` does contain the `conn_state`, but it seems that it is not called for anonymous clients disconnecting. `emqx_channel:handle_info({sock_closed, normal}, _, _)` is called with `conn_state := connected` (which decrements by calling `emqx_channel:ensure_disconnected` and does not call `set_chan_info`), and afterwards `emqx_cm` receives a `DOWN` message from the same chan. When the `DOWN` message is received, we check `?CHAN_INFO_TAB` for that key and check if the persisted state is `connected`. Only in that case we decrement during `DOWN` messages. Testing manually, it is still being persisted as `connected`, hence the double-decrease. We attempt to solve this by using the ETS as a set with `{ClientID, ChanPid}` as the key to resolve duplication.
This commit is contained in:
parent
8a4e0a3ecb
commit
0955122468
|
@ -1536,7 +1536,9 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
||||||
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
||||||
emqx_cm:increment_connected_client_count(),
|
ClientID = info(clientid, Channel),
|
||||||
|
Chan = {ClientID, self()},
|
||||||
|
emqx_cm:increment_connected_client_count(Chan),
|
||||||
Channel#channel{conninfo = NConnInfo,
|
Channel#channel{conninfo = NConnInfo,
|
||||||
conn_state = connected
|
conn_state = connected
|
||||||
}.
|
}.
|
||||||
|
@ -1625,7 +1627,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
||||||
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
||||||
emqx_cm:decrement_connected_client_count(),
|
ClientID = info(clientid, Channel),
|
||||||
|
Chan = {ClientID, self()},
|
||||||
|
emqx_cm:decrement_connected_client_count(Chan),
|
||||||
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -75,8 +75,8 @@
|
||||||
%% Internal export
|
%% Internal export
|
||||||
-export([ stats_fun/0
|
-export([ stats_fun/0
|
||||||
, clean_down/1
|
, clean_down/1
|
||||||
, increment_connected_client_count/0
|
, increment_connected_client_count/1
|
||||||
, decrement_connected_client_count/0
|
, decrement_connected_client_count/1
|
||||||
, get_connected_client_count/0
|
, get_connected_client_count/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -86,18 +86,15 @@
|
||||||
-define(CHAN_TAB, emqx_channel).
|
-define(CHAN_TAB, emqx_channel).
|
||||||
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
||||||
-define(CHAN_INFO_TAB, emqx_channel_info).
|
-define(CHAN_INFO_TAB, emqx_channel_info).
|
||||||
|
-define(CONN_CLIENT_TAB, connected_client_counter).
|
||||||
|
|
||||||
-define(CHAN_STATS,
|
-define(CHAN_STATS,
|
||||||
[{?CHAN_TAB, 'channels.count', 'channels.max'},
|
[{?CHAN_TAB, 'channels.count', 'channels.max'},
|
||||||
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
|
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
|
||||||
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
|
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
|
||||||
|
{?CONN_CLIENT_TAB, 'live_connections.count', 'live_connections.max'}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(CONN_CLIENT_TAB, connected_client_counter).
|
|
||||||
-define(CONN_CLIENT_TAB_KEY, connected_client_count).
|
|
||||||
-define(CONN_CLIENT_TAB_IDX, 2).
|
|
||||||
-define(CONNECTED_CLIENT_STATS, {'live_connections.count', 'live_connections.max'}).
|
|
||||||
|
|
||||||
%% Batch drain
|
%% Batch drain
|
||||||
-define(BATCH_SIZE, 100000).
|
-define(BATCH_SIZE, 100000).
|
||||||
|
|
||||||
|
@ -448,8 +445,7 @@ init([]) ->
|
||||||
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
|
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
|
||||||
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
|
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
|
||||||
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
|
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
|
||||||
ok = emqx_tables:new(?CONN_CLIENT_TAB, [set | TabOpts]),
|
ok = emqx_tables:new(?CONN_CLIENT_TAB, [set, {write_concurrency, true} | TabOpts]),
|
||||||
true = ets:insert(?CONN_CLIENT_TAB, {?CONN_CLIENT_TAB_KEY, 0}),
|
|
||||||
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
|
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
|
||||||
State = #{chan_pmon => emqx_pmon:new()},
|
State = #{chan_pmon => emqx_pmon:new()},
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -466,6 +462,7 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||||
|
?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
|
||||||
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
||||||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -473,7 +470,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
|
||||||
Chan = {ClientID, ChanPid},
|
Chan = {ClientID, ChanPid},
|
||||||
case ets:lookup(?CHAN_INFO_TAB, Chan) of
|
case ets:lookup(?CHAN_INFO_TAB, Chan) of
|
||||||
[{Chan, #{conn_state := connected}, _}] ->
|
[{Chan, #{conn_state := connected}, _}] ->
|
||||||
decrement_connected_client_count();
|
decrement_connected_client_count(Chan);
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
|
@ -499,8 +496,7 @@ clean_down({ChanPid, ClientId}) ->
|
||||||
do_unregister_channel({ClientId, ChanPid}).
|
do_unregister_channel({ClientId, ChanPid}).
|
||||||
|
|
||||||
stats_fun() ->
|
stats_fun() ->
|
||||||
lists:foreach(fun update_stats/1, ?CHAN_STATS),
|
lists:foreach(fun update_stats/1, ?CHAN_STATS).
|
||||||
update_connected_client_stats().
|
|
||||||
|
|
||||||
update_stats({Tab, Stat, MaxStat}) ->
|
update_stats({Tab, Stat, MaxStat}) ->
|
||||||
case ets:info(Tab, size) of
|
case ets:info(Tab, size) of
|
||||||
|
@ -508,12 +504,6 @@ update_stats({Tab, Stat, MaxStat}) ->
|
||||||
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_connected_client_stats() ->
|
|
||||||
{CountStat, MaxStat} = ?CONNECTED_CLIENT_STATS,
|
|
||||||
CCCount = get_connected_client_count(),
|
|
||||||
emqx_stats:setstat(CountStat, MaxStat, CCCount),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
|
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
Chan = {ClientId, ChanPid},
|
Chan = {ClientId, ChanPid},
|
||||||
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
|
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
|
||||||
|
@ -523,19 +513,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
get_chann_conn_mod(ClientId, ChanPid) ->
|
get_chann_conn_mod(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
|
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
|
||||||
|
|
||||||
increment_connected_client_count() ->
|
increment_connected_client_count(Chan) ->
|
||||||
?tp(emqx_cm_connected_client_count_inc, #{}),
|
?tp(emqx_cm_connected_client_count_inc, #{}),
|
||||||
ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY,
|
ets:insert_new(?CONN_CLIENT_TAB, {Chan, true}),
|
||||||
{?CONN_CLIENT_TAB_IDX, 1}),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
decrement_connected_client_count() ->
|
decrement_connected_client_count(Chan) ->
|
||||||
?tp(emqx_cm_connected_client_count_dec, #{}),
|
?tp(emqx_cm_connected_client_count_dec, #{}),
|
||||||
Threshold = 0,
|
ets:delete(?CONN_CLIENT_TAB, Chan),
|
||||||
SetValue = 0,
|
|
||||||
ets:update_counter(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY,
|
|
||||||
{?CONN_CLIENT_TAB_IDX, -1, Threshold, SetValue}),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_connected_client_count() ->
|
get_connected_client_count() ->
|
||||||
ets:lookup_element(?CONN_CLIENT_TAB, ?CONN_CLIENT_TAB_KEY, ?CONN_CLIENT_TAB_IDX).
|
case ets:info(?CONN_CLIENT_TAB, size) of
|
||||||
|
undefined -> 0;
|
||||||
|
Size -> Size
|
||||||
|
end.
|
||||||
|
|
|
@ -36,6 +36,7 @@ all() ->
|
||||||
groups() ->
|
groups() ->
|
||||||
TCs = emqx_ct:all(?MODULE),
|
TCs = emqx_ct:all(?MODULE),
|
||||||
ConnClientTCs = [ t_connected_client_count_persistent
|
ConnClientTCs = [ t_connected_client_count_persistent
|
||||||
|
, t_connected_client_count_anonymous
|
||||||
, t_connected_client_stats
|
, t_connected_client_stats
|
||||||
],
|
],
|
||||||
OtherTCs = TCs -- ConnClientTCs,
|
OtherTCs = TCs -- ConnClientTCs,
|
||||||
|
@ -319,8 +320,8 @@ t_stats_fun({'end', _Config}) ->
|
||||||
ok = emqx_broker:unsubscribe(<<"topic">>),
|
ok = emqx_broker:unsubscribe(<<"topic">>),
|
||||||
ok = emqx_broker:unsubscribe(<<"topic2">>).
|
ok = emqx_broker:unsubscribe(<<"topic2">>).
|
||||||
|
|
||||||
%% persistent sessions, when gone, do not contribute to connect client
|
%% persistent sessions, when gone, do not contribute to connected
|
||||||
%% count
|
%% client count
|
||||||
t_connected_client_count_persistent({init, Config}) ->
|
t_connected_client_count_persistent({init, Config}) ->
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
|
@ -366,10 +367,16 @@ t_connected_client_count_persistent(Config) when is_list(Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
%% abnormal exit of channel process
|
%% abnormal exit of channel process
|
||||||
[ChanPid] = emqx_cm:all_channels(),
|
ChanPids = emqx_cm:all_channels(),
|
||||||
{true, {ok, [_]}} = wait_for_events(
|
{ok, {ok, [_, _]}} = wait_for_events(
|
||||||
fun() -> exit(ChanPid, kill) end,
|
fun() ->
|
||||||
[emqx_cm_connected_client_count_dec]
|
lists:foreach(
|
||||||
|
fun(ChanPid) -> exit(ChanPid, kill) end,
|
||||||
|
ChanPids)
|
||||||
|
end,
|
||||||
|
[ emqx_cm_connected_client_count_dec
|
||||||
|
, emqx_cm_process_down
|
||||||
|
]
|
||||||
),
|
),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
ok;
|
ok;
|
||||||
|
@ -377,6 +384,73 @@ t_connected_client_count_persistent({'end', _Config}) ->
|
||||||
snabbkaffe:stop(),
|
snabbkaffe:stop(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% connections without client_id also contribute to connected client
|
||||||
|
%% count
|
||||||
|
t_connected_client_count_anonymous({init, Config}) ->
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
Config;
|
||||||
|
t_connected_client_count_anonymous(Config) when is_list(Config) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
|
%% first client
|
||||||
|
{ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
|
||||||
|
| Config]),
|
||||||
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
||||||
|
fun() -> emqtt:ConnFun(ConnPid0) end,
|
||||||
|
[emqx_cm_connected_client_count_inc]
|
||||||
|
),
|
||||||
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
|
%% second client
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
|
||||||
|
| Config]),
|
||||||
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
||||||
|
fun() -> emqtt:ConnFun(ConnPid1) end,
|
||||||
|
[emqx_cm_connected_client_count_inc]
|
||||||
|
),
|
||||||
|
?assertEqual(2, emqx_cm:get_connected_client_count()),
|
||||||
|
%% when first client disconnects, shouldn't affect the second
|
||||||
|
{ok, {ok, [_, _]}} = wait_for_events(
|
||||||
|
fun() -> emqtt:disconnect(ConnPid0) end,
|
||||||
|
[ emqx_cm_connected_client_count_dec
|
||||||
|
, emqx_cm_process_down
|
||||||
|
]
|
||||||
|
),
|
||||||
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
|
%% reconnecting
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
|
{{ok, _}, {ok, [_]}} = wait_for_events(
|
||||||
|
fun() -> emqtt:ConnFun(ConnPid2) end,
|
||||||
|
[emqx_cm_connected_client_count_inc]
|
||||||
|
),
|
||||||
|
?assertEqual(2, emqx_cm:get_connected_client_count()),
|
||||||
|
{ok, {ok, [_, _]}} = wait_for_events(
|
||||||
|
fun() -> emqtt:disconnect(ConnPid1) end,
|
||||||
|
[ emqx_cm_connected_client_count_dec
|
||||||
|
, emqx_cm_process_down
|
||||||
|
]
|
||||||
|
),
|
||||||
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
|
%% abnormal exit of channel process
|
||||||
|
Chans = emqx_cm:all_channels(),
|
||||||
|
{ok, {ok, [_, _]}} = wait_for_events(
|
||||||
|
fun() ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(ChanPid) -> exit(ChanPid, kill) end,
|
||||||
|
Chans)
|
||||||
|
end,
|
||||||
|
[ emqx_cm_connected_client_count_dec
|
||||||
|
, emqx_cm_process_down
|
||||||
|
]
|
||||||
|
),
|
||||||
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
|
ok;
|
||||||
|
t_connected_client_count_anonymous({'end', _Config}) ->
|
||||||
|
snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_connected_client_stats({init, Config}) ->
|
t_connected_client_stats({init, Config}) ->
|
||||||
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
||||||
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
||||||
|
@ -422,16 +496,17 @@ t_connected_client_stats({'end', _Config}) ->
|
||||||
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% alwayseven if
|
%% the count must be always non negative
|
||||||
t_connect_client_never_negative({init, Config}) ->
|
t_connect_client_never_negative({init, Config}) ->
|
||||||
Config;
|
Config;
|
||||||
t_connect_client_never_negative(Config) when is_list(Config) ->
|
t_connect_client_never_negative(Config) when is_list(Config) ->
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
%% would go to -1
|
%% would go to -1
|
||||||
emqx_cm:decrement_connected_client_count(),
|
Chan = {<<"clientid">>, list_to_pid("<0.0.1>")},
|
||||||
|
emqx_cm:decrement_connected_client_count(Chan),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
%% would be 0, if really went to -1
|
%% would be 0, if really went to -1
|
||||||
emqx_cm:increment_connected_client_count(),
|
emqx_cm:increment_connected_client_count(Chan),
|
||||||
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
ok;
|
ok;
|
||||||
t_connect_client_never_negative({'end', _Config}) ->
|
t_connect_client_never_negative({'end', _Config}) ->
|
||||||
|
|
|
@ -33,8 +33,8 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
%% CM Meck
|
%% CM Meck
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end),
|
||||||
%% Access Control Meck
|
%% Access Control Meck
|
||||||
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_access_control, authenticate,
|
ok = meck:expect(emqx_access_control, authenticate,
|
||||||
|
|
|
@ -36,8 +36,8 @@ init_per_suite(Config) ->
|
||||||
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
|
||||||
%% Meck Cm
|
%% Meck Cm
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end),
|
||||||
%% Meck Limiter
|
%% Meck Limiter
|
||||||
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
|
||||||
%% Meck Pd
|
%% Meck Pd
|
||||||
|
|
|
@ -50,8 +50,8 @@ init_per_testcase(TestCase, Config) when
|
||||||
->
|
->
|
||||||
%% Meck Cm
|
%% Meck Cm
|
||||||
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_cm, increment_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, increment_connected_client_count, fun(_) -> ok end),
|
||||||
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun() -> ok end),
|
ok = meck:expect(emqx_cm, decrement_connected_client_count, fun(_) -> ok end),
|
||||||
%% Mock cowboy_req
|
%% Mock cowboy_req
|
||||||
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
|
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
|
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
|
||||||
|
|
Loading…
Reference in New Issue