feat(stats): track live channel / client count

Track connected client count

In order to correctly display the number of _connected_ clients in our
monitor dashboard, we need to track those connections that are
actually connected to clients, not considering connections from
persistent sessions that are disconnected. Today, the
`connections.count` that is displayed in the dashboards considers
those disconnected persistent sessions as well.

The new statistics can be found in the
[`emqx_management`](https://github.com/emqx/emqx/tree/main-v4.4/apps/emqx_management)
plugin, under `/api/v4/stats`, in the keys
`live_connections.{max,count}`.
This commit is contained in:
Thales Macedo Garitezi 2021-11-08 15:46:49 -03:00
parent ae125cd44d
commit 8fe342a02d
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
10 changed files with 354 additions and 28 deletions

View File

@ -20,7 +20,7 @@
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
]}.
{plugins, [{rebar3_proper, "0.12.1"}]}.

View File

@ -1612,6 +1612,8 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_connected(ChanPid),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
@ -1697,6 +1699,8 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
ChanPid = self(),
emqx_cm:mark_channel_disconnected(ChanPid),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%--------------------------------------------------------------------

View File

@ -73,7 +73,11 @@
]).
%% Internal export
-export([stats_fun/0]).
-export([ stats_fun/0
, mark_channel_connected/1
, mark_channel_disconnected/1
, get_connected_client_count/0
]).
-type(chan_pid() :: pid()).
@ -81,11 +85,13 @@
-define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
-define(CHAN_STATS,
[{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'},
{?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}
]).
%% Batch drain
@ -446,8 +452,10 @@ init([]) ->
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_LIVE_TAB, [set, {write_concurrency, true} | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}.
State = #{chan_pmon => emqx_pmon:new()},
{ok, State}.
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
@ -456,17 +464,21 @@ handle_call(Req, _From, State) ->
handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
PMon1 = emqx_pmon:monitor(ChanPid, ClientId, PMon),
{noreply, State#{chan_pmon := PMon1}};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(
fun({ChanPid, _ClientID}) ->
mark_channel_disconnected(ChanPid)
end,
Items),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
{noreply, State#{chan_pmon := PMon1}};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
@ -503,3 +515,18 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{}),
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
ok.
mark_channel_disconnected(ChanPid) ->
?tp(emqx_cm_connected_client_count_dec, #{}),
ets:delete(?CHAN_LIVE_TAB, ChanPid),
ok.
get_connected_client_count() ->
case ets:info(?CHAN_LIVE_TAB, size) of
undefined -> 0;
Size -> Size
end.

View File

@ -540,7 +540,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
end,
?tp(info, terminate, #{reason => Reason}),
maybe_raise_excption(Reason).
maybe_raise_exception(Reason).
%% close socket, discard new state, always return ok.
close_socket_ok(State) ->
@ -548,12 +548,12 @@ close_socket_ok(State) ->
ok.
%% tell truth about the original exception
maybe_raise_excption(#{exception := Exception,
maybe_raise_exception(#{exception := Exception,
context := Context,
stacktrace := Stacktrace
}) ->
erlang:raise(Exception, Context, Stacktrace);
maybe_raise_excption(Reason) ->
maybe_raise_exception(Reason) ->
exit(Reason).
%%--------------------------------------------------------------------

View File

@ -21,6 +21,7 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
@ -66,8 +67,10 @@
%% Connection stats
-define(CONNECTION_STATS,
['connections.count', %% Count of Concurrent Connections
'connections.max' %% Maximum Number of Concurrent Connections
[ 'connections.count' %% Count of Concurrent Connections
, 'connections.max' %% Maximum Number of Concurrent Connections
, 'live_connections.count' %% Count of connected clients
, 'live_connections.max' %% Maximum number of connected clients
]).
%% Channel stats
@ -215,6 +218,11 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
ets:insert(?TAB, {MaxStat, Val})
end,
safe_update_element(Stat, Val),
?tp(emqx_stats_setstat,
#{ count_stat => Stat
, max_stat => MaxStat
, value => Val
}),
{noreply, State};
handle_cast({update_interval, Update = #update{name = Name}},

View File

@ -23,20 +23,71 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[ {group, all_cases}
, {group, connected_client_count_group}
].
init_per_suite(Config) ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
ConnClientTCs = [ t_connected_client_count_persistent
, t_connected_client_count_anonymous
, t_connected_client_stats
],
OtherTCs = TCs -- ConnClientTCs,
[ {all_cases, [], OtherTCs}
, {connected_client_count_group, [ {group, tcp}
, {group, ws}
, {group, quic}
]}
, {tcp, [], ConnClientTCs}
, {ws, [], ConnClientTCs}
, {quic, [], ConnClientTCs}
].
init_per_group(connected_client_count_group, Config) ->
Config;
init_per_group(tcp, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[{conn_fun, connect} | Config];
init_per_group(ws, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[ {ssl, false}
, {enable_websocket, true}
, {conn_fun, ws_connect}
, {port, 8083}
, {host, "localhost"}
| Config
];
init_per_group(quic, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[ {conn_fun, quic_connect}
, {port, 14567}
| Config];
init_per_group(_Group, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
end_per_group(connected_client_count_group, _Config) ->
ok;
end_per_group(_Group, _Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).
@ -277,6 +328,236 @@ t_shard({'end', _Config}) ->
emqx_broker:unsubscribe(<<"topic">>),
ok = meck:unload(emqx_broker_helper).
%% persistent sessions, when gone, do not contribute to connected
%% client count
t_connected_client_count_persistent({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_persistent(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
ClientID = <<"clientid">>,
?assertEqual(0, emqx_cm:get_connected_client_count()),
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[emqx_cm_connected_client_count_dec]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% taking over
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
, {clientid, ClientID}
| Config
]),
{{ok, _}, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[ emqx_cm_connected_client_count_inc
, emqx_cm_connected_client_count_dec
],
500
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
ChanPids = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
ChanPids)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_persistent({'end', _Config}) ->
snabbkaffe:stop(),
ok.
%% connections without client_id also contribute to connected client
%% count
t_connected_client_count_anonymous({init, Config}) ->
ok = snabbkaffe:start_trace(),
process_flag(trap_exit, true),
Config;
t_connected_client_count_anonymous(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% first client
{ok, ConnPid0} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid0) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% second client
{ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
| Config]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid1) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
%% when first client disconnects, shouldn't affect the second
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid0) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% reconnecting
{ok, ConnPid2} = emqtt:start_link([ {clean_start, true}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid2) end,
[emqx_cm_connected_client_count_inc]
),
?assertEqual(2, emqx_cm:get_connected_client_count()),
{ok, {ok, [_, _]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid1) end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(1, emqx_cm:get_connected_client_count()),
%% abnormal exit of channel process
Chans = emqx_cm:all_channels(),
{ok, {ok, [_, _]}} = wait_for_events(
fun() ->
lists:foreach(
fun(ChanPid) -> exit(ChanPid, kill) end,
Chans)
end,
[ emqx_cm_connected_client_count_dec
, emqx_cm_process_down
]
),
?assertEqual(0, emqx_cm:get_connected_client_count()),
ok;
t_connected_client_count_anonymous({'end', _Config}) ->
snabbkaffe:stop(),
ok.
t_connected_client_stats({init, Config}) ->
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok = snabbkaffe:start_trace(),
Config;
t_connected_client_stats(Config) when is_list(Config) ->
ConnFun = ?config(conn_fun, Config),
?assertEqual(0, emqx_cm:get_connected_client_count()),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
{ok, ConnPid} = emqtt:start_link([ {clean_start, true}
, {clientid, <<"clientid">>}
| Config
]),
{{ok, _}, {ok, [_]}} = wait_for_events(
fun() -> emqtt:ConnFun(ConnPid) end,
[emqx_cm_connected_client_count_inc]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
{ok, {ok, [_]}} = wait_for_events(
fun() -> emqtt:disconnect(ConnPid) end,
[emqx_cm_connected_client_count_dec]
),
%% ensure stats are synchronized
{_, {ok, [_]}} = wait_for_stats(
fun emqx_cm:stats_fun/0,
[#{count_stat => 'live_connections.count',
max_stat => 'live_connections.max'}]
),
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
ok;
t_connected_client_stats({'end', _Config}) ->
ok = snabbkaffe:stop(),
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
ok.
%% the count must be always non negative
t_connect_client_never_negative({init, Config}) ->
Config;
t_connect_client_never_negative(Config) when is_list(Config) ->
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would go to -1
ChanPid = list_to_pid("<0.0.1>"),
emqx_cm:mark_channel_disconnected(ChanPid),
?assertEqual(0, emqx_cm:get_connected_client_count()),
%% would be 0, if really went to -1
emqx_cm:mark_channel_connected(ChanPid),
?assertEqual(1, emqx_cm:get_connected_client_count()),
ok;
t_connect_client_never_negative({'end', _Config}) ->
ok.
wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500).
wait_for_events(Action, Kinds, Timeout) ->
Predicate = fun(#{?snk_kind := K}) ->
lists:member(K, Kinds)
end,
N = length(Kinds),
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
wait_for_stats(Action, Stats) ->
Predicate = fun(Event = #{?snk_kind := emqx_stats_setstat}) ->
Stat = maps:with(
[ count_stat
, max_stat
], Event),
lists:member(Stat, Stats);
(_) ->
false
end,
N = length(Stats),
Timeout = 500,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
Res = Action(),
case snabbkaffe_collector:receive_events(Sub) of
{timeout, _} ->
{Res, timeout};
{ok, Events} ->
{Res, {ok, Events}}
end.
recv_msgs(Count) ->
recv_msgs(Count, []).

View File

@ -144,6 +144,8 @@ set_test_listenser_confs() ->
init_per_suite(Config) ->
%% CM Meck
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Access Control Meck
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,

View File

@ -36,6 +36,8 @@ init_per_suite(Config) ->
ok = meck:new(emqx_channel, [passthrough, no_history, no_link]),
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Meck Limiter
ok = meck:new(emqx_limiter, [passthrough, no_history, no_link]),
%% Meck Pd
@ -113,7 +115,7 @@ t_ws_pingreq_before_connected(_) ->
t_info(_) ->
CPid = spawn(fun() ->
receive
receive
{'$gen_call', From, info} ->
gen_server:reply(From, emqx_connection:info(st()))
after
@ -132,7 +134,7 @@ t_info_limiter(_) ->
t_stats(_) ->
CPid = spawn(fun() ->
receive
receive
{'$gen_call', From, stats} ->
gen_server:reply(From, emqx_connection:stats(st()))
after
@ -147,10 +149,10 @@ t_stats(_) ->
{send_pend,0}| _] , Stats).
t_process_msg(_) ->
with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in,
fun(_Packet, Channel) ->
{ok, Channel}
with_conn(fun(CPid) ->
ok = meck:expect(emqx_channel, handle_in,
fun(_Packet, Channel) ->
{ok, Channel}
end),
CPid ! {incoming, ?PACKET(?PINGREQ)},
CPid ! {incoming, undefined},
@ -320,7 +322,7 @@ t_with_channel(_) ->
t_handle_outgoing(_) ->
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
t_handle_info(_) ->
?assertMatch({ok, {event,running}, _NState},
emqx_connection:handle_info(activate_socket, st())),
@ -347,7 +349,7 @@ t_activate_socket(_) ->
State = st(),
{ok, NStats} = emqx_connection:activate_socket(State),
?assertEqual(running, emqx_connection:info(sockstate, NStats)),
State1 = st(#{sockstate => blocked}),
?assertEqual({ok, State1}, emqx_connection:activate_socket(State1)),

View File

@ -48,7 +48,10 @@ init_per_testcase(TestCase, Config) when
TestCase =/= t_ws_pingreq_before_connected,
TestCase =/= t_ws_non_check_origin
->
emqx_channel_SUITE:set_test_listenser_confs(),
%% Meck Cm
ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end),
ok = meck:expect(emqx_cm, mark_channel_disconnected, fun(_) -> ok end),
%% Mock cowboy_req
ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
@ -90,7 +93,8 @@ end_per_testcase(TestCase, _Config) when
TestCase =/= t_ws_pingreq_before_connected
->
lists:foreach(fun meck:unload/1,
[cowboy_req,
[emqx_cm,
cowboy_req,
emqx_access_control,
emqx_broker,
emqx_hooks,
@ -363,14 +367,12 @@ t_handle_info_close(_) ->
{[{close, _}], _St} = ?ws_conn:handle_info({close, protocol_error}, st()).
t_handle_info_event(_) ->
ok = meck:new(emqx_cm, [passthrough, no_history]),
ok = meck:expect(emqx_cm, register_channel, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, insert_channel_info, fun(_,_,_) -> ok end),
ok = meck:expect(emqx_cm, connection_closed, fun(_) -> true end),
{ok, _} = ?ws_conn:handle_info({event, connected}, st()),
{ok, _} = ?ws_conn:handle_info({event, disconnected}, st()),
{ok, _} = ?ws_conn:handle_info({event, updated}, st()),
ok = meck:unload(emqx_cm).
{ok, _} = ?ws_conn:handle_info({event, updated}, st()).
t_handle_timeout_idle_timeout(_) ->
TRef = make_ref(),

View File

@ -62,7 +62,7 @@
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.20.6"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}