fix(gw_jt808): subscriptions and channel stats
This commit is contained in:
parent
ec83ec7730
commit
0b1838a5ff
|
@ -85,6 +85,8 @@
|
|||
|
||||
-define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]).
|
||||
|
||||
-define(DN_TOPIC_SUBOPTS, #{rap => 0, nl => 0, qos => 0, rh => 0}).
|
||||
|
||||
-define(RETX_INTERVAL, 8000).
|
||||
-define(RETX_MAX_TIME, 5).
|
||||
|
||||
|
@ -122,8 +124,21 @@ info(conn_state, #channel{conn_state = ConnState}) ->
|
|||
info(authcode, #channel{authcode = AuthCode}) ->
|
||||
AuthCode.
|
||||
|
||||
stats(_Channel) ->
|
||||
[].
|
||||
-spec stats(channel()) -> emqx_types:stats().
|
||||
stats(#channel{inflight = Inflight, mqueue = Queue}) ->
|
||||
%% XXX: A fake stats for managed by emqx_management
|
||||
[
|
||||
{subscriptions_cnt, 1},
|
||||
{subscriptions_max, 1},
|
||||
{inflight_cnt, emqx_inflight:size(Inflight)},
|
||||
{inflight_max, emqx_inflight:max_size(Inflight)},
|
||||
{mqueue_len, queue:len(Queue)},
|
||||
{mqueue_max, 0},
|
||||
{mqueue_dropped, 0},
|
||||
{next_pkt_id, 0},
|
||||
{awaiting_rel_cnt, 0},
|
||||
{awaiting_rel_max, 0}
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Init the Channel
|
||||
|
@ -422,6 +437,8 @@ handle_call(kick, _From, Channel) ->
|
|||
disconnect_and_shutdown(kicked, ok, Channel1);
|
||||
handle_call(discard, _From, Channel) ->
|
||||
disconnect_and_shutdown(discarded, ok, Channel);
|
||||
handle_call(subscriptions, _From, Channel = #channel{dn_topic = DnTopic}) ->
|
||||
reply({ok, [{DnTopic, ?DN_TOPIC_SUBOPTS}]}, Channel);
|
||||
handle_call(Req, _From, Channel) ->
|
||||
log(error, #{msg => "unexpected_call", call => Req}, Channel),
|
||||
reply(ignored, Channel).
|
||||
|
@ -633,7 +650,9 @@ process_connect(
|
|||
ok = autosubcribe(NChannel),
|
||||
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
|
||||
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
|
||||
_ = emqx_gateway_ctx:insert_channel_info(Ctx, ClientId, info(NChannel), undefined),
|
||||
_ = emqx_gateway_ctx:insert_channel_info(
|
||||
Ctx, ClientId, info(NChannel), stats(NChannel)
|
||||
),
|
||||
authack({0, MsgSn, NChannel});
|
||||
{error, Reason} ->
|
||||
log(
|
||||
|
@ -941,9 +960,10 @@ autosubcribe(#channel{
|
|||
#{clientid := ClientId},
|
||||
dn_topic = Topic
|
||||
}) ->
|
||||
SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
|
||||
_ = emqx_broker:subscribe(Topic, ClientId, SubOpts),
|
||||
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
|
||||
_ = emqx_broker:subscribe(Topic, ClientId, ?DN_TOPIC_SUBOPTS),
|
||||
ok = emqx_hooks:run('session.subscribed', [
|
||||
ClientInfo, Topic, ?DN_TOPIC_SUBOPTS#{is_new => true}
|
||||
]).
|
||||
|
||||
start_keepalive(Secs, _Channel) when Secs > 0 ->
|
||||
self() ! {keepalive, start, round(Secs) * 1000}.
|
||||
|
|
Loading…
Reference in New Issue