diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index fd83cb291..45b85f92e 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -85,6 +85,8 @@ -define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]). +-define(DN_TOPIC_SUBOPTS, #{rap => 0, nl => 0, qos => 0, rh => 0}). + -define(RETX_INTERVAL, 8000). -define(RETX_MAX_TIME, 5). @@ -122,8 +124,21 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(authcode, #channel{authcode = AuthCode}) -> AuthCode. -stats(_Channel) -> - []. +-spec stats(channel()) -> emqx_types:stats(). +stats(#channel{inflight = Inflight, mqueue = Queue}) -> + %% XXX: A fake stats for managed by emqx_management + [ + {subscriptions_cnt, 1}, + {subscriptions_max, 1}, + {inflight_cnt, emqx_inflight:size(Inflight)}, + {inflight_max, emqx_inflight:max_size(Inflight)}, + {mqueue_len, queue:len(Queue)}, + {mqueue_max, 0}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0} + ]. %%-------------------------------------------------------------------- %% Init the Channel @@ -422,6 +437,8 @@ handle_call(kick, _From, Channel) -> disconnect_and_shutdown(kicked, ok, Channel1); handle_call(discard, _From, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); +handle_call(subscriptions, _From, Channel = #channel{dn_topic = DnTopic}) -> + reply({ok, [{DnTopic, ?DN_TOPIC_SUBOPTS}]}, Channel); handle_call(Req, _From, Channel) -> log(error, #{msg => "unexpected_call", call => Req}, Channel), reply(ignored, Channel). @@ -633,7 +650,9 @@ process_connect( ok = autosubcribe(NChannel), _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), - _ = emqx_gateway_ctx:insert_channel_info(Ctx, ClientId, info(NChannel), undefined), + _ = emqx_gateway_ctx:insert_channel_info( + Ctx, ClientId, info(NChannel), stats(NChannel) + ), authack({0, MsgSn, NChannel}); {error, Reason} -> log( @@ -941,9 +960,10 @@ autosubcribe(#channel{ #{clientid := ClientId}, dn_topic = Topic }) -> - SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, - _ = emqx_broker:subscribe(Topic, ClientId, SubOpts), - ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). + _ = emqx_broker:subscribe(Topic, ClientId, ?DN_TOPIC_SUBOPTS), + ok = emqx_hooks:run('session.subscribed', [ + ClientInfo, Topic, ?DN_TOPIC_SUBOPTS#{is_new => true} + ]). start_keepalive(Secs, _Channel) when Secs > 0 -> self() ! {keepalive, start, round(Secs) * 1000}.