diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index 9fc3b8e0f..0b9f864a3 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -127,6 +127,8 @@ } ). +-define(DEFAULT_OCPP_DN_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}). + -dialyzer(no_match). %%-------------------------------------------------------------------- @@ -547,6 +549,13 @@ handle_call(kick, _From, Channel) -> shutdown(kicked, ok, Channel); handle_call(discard, _From, Channel) -> shutdown(discarded, ok, Channel); +handle_call( + subscriptions, + _From, + Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}} +) -> + Subs = [{dntopic(ClientId, Mountpoint), ?DEFAULT_OCPP_DN_SUBOPTS}], + reply({ok, Subs}, Channel); handle_call(Req, From, Channel) -> ?SLOG(error, #{msg => "unexpected_call", req => Req, from => From}), reply(ignored, Channel). @@ -614,22 +623,6 @@ process_connect( {error, Reason} end. -ensure_subscribe_dn_topics( - Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo} -) -> - SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}, - Topic0 = proc_tmpl( - emqx_ocpp_conf:dntopic(), - #{ - clientid => ClientId, - cid => ClientId - } - ), - Topic = emqx_mountpoint:mount(Mountpoint, Topic0), - ok = emqx_broker:subscribe(Topic, ClientId, SubOpts), - ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), - Channel. - %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- @@ -853,6 +846,28 @@ reset_keepalive(Interval, Channel = #channel{conninfo = ConnInfo, timers = Timer heartbeat_checking_times_backoff() -> max(0, emqx_ocpp_conf:heartbeat_checking_times_backoff() - 1). +%%-------------------------------------------------------------------- +%% Ensure Subscriptions + +ensure_subscribe_dn_topics( + Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo} +) -> + SubOpts = ?DEFAULT_OCPP_DN_SUBOPTS, + Topic = dntopic(ClientId, Mountpoint), + ok = emqx_broker:subscribe(Topic, ClientId, SubOpts), + ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), + Channel. + +dntopic(ClientId, Mountpoint) -> + Topic0 = proc_tmpl( + emqx_ocpp_conf:dntopic(), + #{ + clientid => ClientId, + cid => ClientId + } + ), + emqx_mountpoint:mount(Mountpoint, Topic0). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------