%%-------------------------------------------------------------------- %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_ocpp_channel). -behaviour(emqx_gateway_channel). -include("emqx_ocpp.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[OCPP-Chann]"). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. -export([ info/1, info/2, stats/1 ]). -export([ init/2, authenticate/2, handle_in/2, handle_deliver/2, handle_out/3, handle_timeout/3, handle_call/3, handle_cast/2, handle_info/2, terminate/2 ]). %% Exports for CT -export([set_field/3]). -export_type([channel/0]). -record(channel, { %% Context ctx :: emqx_gateway_ctx:context(), %% ConnInfo conninfo :: emqx_types:conninfo(), %% ClientInfo clientinfo :: emqx_types:clientinfo(), %% Session session :: option(map()), %% ClientInfo override specs clientinfo_override :: map(), %% Keepalive keepalive :: option(emqx_ocpp_keepalive:keepalive()), %% Stores all unsent messages. mqueue :: queue:queue(), %% Timers timers :: #{atom() => disabled | option(reference())}, %% Conn State conn_state :: conn_state() }). -type channel() :: #channel{}. -type conn_state() :: idle | connecting | connected | disconnected. -type reply() :: {outgoing, emqx_ocpp_frame:frame()} | {outgoing, [emqx_ocpp_frame:frame()]} | {event, conn_state() | updated} | {close, Reason :: atom()}. -type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ alive_timer => keepalive, connection_expire_timer => connection_expire }). -define(INFO_KEYS, [ conninfo, conn_state, clientinfo, session ]). -define(CHANNEL_METRICS, [ recv_pkt, recv_msg, 'recv_msg.qos0', 'recv_msg.qos1', 'recv_msg.qos2', 'recv_msg.dropped', 'recv_msg.dropped.await_pubrel_timeout', send_pkt, send_msg, 'send_msg.qos0', 'send_msg.qos1', 'send_msg.qos2', 'send_msg.dropped', 'send_msg.dropped.expired', 'send_msg.dropped.queue_full', 'send_msg.dropped.too_large' ]). -define(DEFAULT_OVERRIDE, %% Generate clientid by default #{ clientid => <<"">>, username => <<"">>, password => <<"">> } ). -define(DEFAULT_OCPP_DN_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}). -dialyzer(no_match). %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- %% @doc Get infos of the channel. -spec info(channel()) -> emqx_types:infos(). info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). -spec info(list(atom()) | atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; info(socktype, #channel{conninfo = ConnInfo}) -> maps:get(socktype, ConnInfo, undefined); info(peername, #channel{conninfo = ConnInfo}) -> maps:get(peername, ConnInfo, undefined); info(sockname, #channel{conninfo = ConnInfo}) -> maps:get(sockname, ConnInfo, undefined); info(proto_name, #channel{conninfo = ConnInfo}) -> maps:get(proto_name, ConnInfo, undefined); info(proto_ver, #channel{conninfo = ConnInfo}) -> maps:get(proto_ver, ConnInfo, undefined); info(connected_at, #channel{conninfo = ConnInfo}) -> maps:get(connected_at, ConnInfo, undefined); info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(zone, #channel{clientinfo = ClientInfo}) -> maps:get(zone, ClientInfo, undefined); info(clientid, #channel{clientinfo = ClientInfo}) -> maps:get(clientid, ClientInfo, undefined); info(username, #channel{clientinfo = ClientInfo}) -> maps:get(username, ClientInfo, undefined); info(session, #channel{conninfo = ConnInfo}) -> %% XXX: #{ created_at => maps:get(connected_at, ConnInfo, undefined), is_persistent => false, subscriptions => #{}, upgrade_qos => false, retry_interval => 0, await_rel_timeout => 0 }; info(conn_state, #channel{conn_state = ConnState}) -> ConnState; info(keepalive, #channel{keepalive = Keepalive}) -> emqx_utils:maybe_apply(fun emqx_ocpp_keepalive:info/1, Keepalive); info(ctx, #channel{ctx = Ctx}) -> Ctx; info(timers, #channel{timers = Timers}) -> Timers. -spec stats(channel()) -> emqx_types:stats(). stats(#channel{mqueue = MQueue}) -> %% XXX: A fake stats for managed by emqx_management SessionStats = [ {subscriptions_cnt, 1}, {subscriptions_max, 1}, {inflight_cnt, 0}, {inflight_max, 0}, {mqueue_len, queue:len(MQueue)}, {mqueue_max, queue:len(MQueue)}, {mqueue_dropped, 0}, {next_pkt_id, 0}, {awaiting_rel_cnt, 0}, {awaiting_rel_max, 0} ], lists:append(SessionStats, emqx_pd:get_counters(?CHANNEL_METRICS)). %%-------------------------------------------------------------------- %% Init the channel %%-------------------------------------------------------------------- -spec init(emqx_types:conninfo(), map()) -> channel(). init( ConnInfo = #{ peername := {PeerHost, _Port}, sockname := {_Host, SockPort} }, Options ) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Options, undefined), ListenerId = case maps:get(listener, Options, undefined) of undefined -> undefined; {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) end, EnableAuthn = maps:get(enable_authn, Options, true), ClientInfo = setting_peercert_infos( Peercert, #{ zone => default, listener => ListenerId, protocol => ocpp, peerhost => PeerHost, sockport => SockPort, clientid => undefined, username => undefined, is_bridge => false, is_superuser => false, enalbe_authn => EnableAuthn, mountpoint => Mountpoint } ), ConnInfo1 = ConnInfo#{ keepalive => emqx_ocpp_conf:default_heartbeat_interval() }, {NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo1), Ctx = maps:get(ctx, Options), Override = maps:merge( ?DEFAULT_OVERRIDE, maps:get(clientinfo_override, Options, #{}) ), #channel{ ctx = Ctx, conninfo = NConnInfo, clientinfo = NClientInfo, clientinfo_override = Override, mqueue = queue:new(), timers = #{}, conn_state = idle }. setting_peercert_infos(NoSSL, ClientInfo) when NoSSL =:= nossl; NoSSL =:= undefined -> ClientInfo; setting_peercert_infos(Peercert, ClientInfo) -> {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)}, ClientInfo#{dn => DN, cn => CN}. take_ws_cookie(ClientInfo, ConnInfo) -> case maps:take(ws_cookie, ConnInfo) of {WsCookie, NConnInfo} -> {ClientInfo#{ws_cookie => WsCookie}, NConnInfo}; _ -> {ClientInfo, ConnInfo} end. authenticate(UserInfo, Channel) -> case emqx_utils:pipeline( [ fun enrich_client/2, fun run_conn_hooks/2, fun check_banned/2, fun auth_connect/2 ], UserInfo, Channel#channel{conn_state = connecting} ) of {ok, _, NChannel} -> {ok, NChannel}; {error, Reason, _NChannel} -> {error, Reason} end. enrich_client( #{ clientid := ClientId, username := Username, proto_name := ProtoName, proto_ver := ProtoVer }, Channel = #channel{ conninfo = ConnInfo, clientinfo = ClientInfo } ) -> NConnInfo = ConnInfo#{ clientid => ClientId, username => Username, proto_name => ProtoName, proto_ver => ProtoVer, clean_start => true, conn_props => #{}, expiry_interval => 0, receive_maximum => 1 }, NClientInfo = ClientInfo#{ clientid => ClientId, username => Username }, {ok, Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}}. set_log_meta(#channel{ clientinfo = #{clientid := ClientId}, conninfo = #{peername := Peername} }) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_clientid(ClientId). run_conn_hooks(_UserInfo, Channel = #channel{conninfo = ConnInfo}) -> case run_hooks('client.connect', [ConnInfo], #{}) of Error = {error, _Reason} -> Error; _NConnProps -> {ok, Channel} end. check_banned(_UserInfo, #channel{clientinfo = ClientInfo}) -> case emqx_banned:check(ClientInfo) of true -> {error, banned}; false -> ok end. auth_connect( #{password := Password}, #channel{ctx = Ctx, clientinfo = ClientInfo} = Channel ) -> #{ clientid := ClientId, username := Username } = ClientInfo, case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of {ok, NClientInfo} -> {ok, Channel#channel{clientinfo = NClientInfo}}; {error, Reason} -> ?SLOG(warning, #{ msg => "client_login_failed", clientid => ClientId, username => Username, reason => Reason }), {error, Reason} end. publish( Frame, Channel = #channel{ clientinfo = #{ clientid := ClientId, username := Username, protocol := Protocol, peerhost := PeerHost, mountpoint := Mountpoint }, conninfo = #{proto_ver := ProtoVer} } ) when is_map(Frame) -> Topic0 = upstream_topic(Frame, Channel), Topic = emqx_mountpoint:mount(Mountpoint, Topic0), Payload = frame2payload(Frame), emqx_broker:publish( emqx_message:make( ClientId, ?QOS_2, Topic, Payload, #{}, #{ protocol => Protocol, proto_ver => ProtoVer, username => Username, peerhost => PeerHost } ) ). upstream_topic( Frame = #{id := Id, type := Type}, #channel{clientinfo = #{clientid := ClientId}} ) -> Vars = #{id => Id, type => Type, clientid => ClientId, cid => ClientId}, case Type of ?OCPP_MSG_TYPE_ID_CALL -> Action = maps:get(action, Frame), proc_tmpl( emqx_ocpp_conf:uptopic(Action), Vars#{action => Action} ); ?OCPP_MSG_TYPE_ID_CALLRESULT -> proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars); ?OCPP_MSG_TYPE_ID_CALLERROR -> proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars) end. %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- -spec handle_in(emqx_ocpp_frame:frame(), channel()) -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}. handle_in(?IS_REQ(Frame), Channel) -> %% TODO: strit mode _ = publish(Frame, Channel), {ok, Channel}; handle_in(Frame = #{type := Type}, Channel) when Type == ?OCPP_MSG_TYPE_ID_CALLRESULT; Type == ?OCPP_MSG_TYPE_ID_CALLERROR -> _ = publish(Frame, Channel), try_deliver(Channel); handle_in({frame_error, {badjson, ReasonStr}}, Channel) -> shutdown({frame_error, {badjson, iolist_to_binary(ReasonStr)}}, Channel); handle_in({frame_error, {validation_faliure, Id, ReasonStr}}, Channel) -> handle_out( dnstream, ?ERR_FRAME(Id, ?OCPP_ERR_FormationViolation, iolist_to_binary(ReasonStr)), Channel ); handle_in(Frame, Channel) -> ?SLOG(error, #{msg => "unexpected_incoming", frame => Frame}), {ok, Channel}. %%-------------------------------------------------------------------- %% Process Disconnect %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- -spec handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}. handle_deliver(Delivers, Channel) -> NChannel = lists:foldl( fun({deliver, _, Msg}, Acc) -> enqueue(Msg, Acc) end, Channel, Delivers ), try_deliver(NChannel). enqueue(Msg, Channel = #channel{mqueue = MQueue}) -> case queue:len(MQueue) > emqx_ocpp_conf:max_mqueue_len() of false -> try payload2frame(Msg#message.payload) of Frame -> Channel#channel{mqueue = queue:in(Frame, MQueue)} catch _:_ -> ?SLOG(error, #{msg => "drop_invalid_message", message => Msg}), Channel end; true -> ?SLOG(error, #{msg => "drop_message", message => Msg, reason => message_queue_full}), Channel end. try_deliver(Channel = #channel{mqueue = MQueue}) -> case queue:is_empty(MQueue) of false -> %% TODO: strit_mode Frames = queue:to_list(MQueue), handle_out(dnstream, Frames, Channel#channel{mqueue = queue:new()}); true -> {ok, Channel} end. %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- -spec handle_out(atom(), term(), channel()) -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}. handle_out(dnstream, Frames, Channel) -> {Outgoings, NChannel} = apply_frame(Frames, Channel), {ok, [{outgoing, Frames} | Outgoings], NChannel}; handle_out(disconnect, keepalive_timeout, Channel) -> {shutdown, keepalive_timeout, Channel}; handle_out(Type, Data, Channel) -> ?SLOG(error, #{msg => "unexpected_outgoing", type => Type, data => Data}), {ok, Channel}. %%-------------------------------------------------------------------- %% Apply Response frame to channel state machine %%-------------------------------------------------------------------- apply_frame(Frames, Channel) when is_list(Frames) -> {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames), {lists:reverse(Outgoings), NChannel}; apply_frame(Frames, Channel) -> ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}), Channel. do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) -> case Status of <<"Accepted">> -> ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}), {[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)}; _ -> {Outgoings, Channel} end; do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) -> ?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}), Acc. %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- -spec handle_call(Req :: any(), From :: emqx_gateway_channel:gen_server_from(), channel()) -> {reply, Reply :: any(), channel()} | {shutdown, Reason :: any(), Reply :: any(), channel()}. handle_call(kick, _From, Channel) -> shutdown(kicked, ok, Channel); handle_call(discard, _From, Channel) -> shutdown(discarded, ok, Channel); handle_call( subscriptions, _From, Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}} ) -> Subs = [{dntopic(ClientId, Mountpoint), ?DEFAULT_OCPP_DN_SUBOPTS}], reply({ok, Subs}, Channel); handle_call(Req, From, Channel) -> ?SLOG(error, #{msg => "unexpected_call", req => Req, from => From}), reply(ignored, Channel). %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- -spec handle_cast(Req :: any(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_cast(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_cast", req => Req}), {ok, Channel}. %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- -spec handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_info(after_init, Channel0) -> set_log_meta(Channel0), case process_connect(Channel0) of {ok, Channel} -> NChannel = ensure_keepalive( ensure_connected( ensure_subscribe_dn_topics(Channel) ) ), {ok, [{event, connected}], NChannel}; {error, Reason} -> shutdown(Reason, Channel0) end; handle_info({sock_closed, Reason}, Channel) -> NChannel = ensure_disconnected({sock_closed, Reason}, Channel), shutdown(Reason, NChannel); handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. process_connect( Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, clientinfo = ClientInfo } ) -> SessFun = fun(_, _) -> #{} end, case emqx_gateway_ctx:open_session( Ctx, true, ClientInfo, ConnInfo, SessFun ) of {ok, #{session := Session}} -> NChannel = Channel#channel{session = Session}, {ok, NChannel}; {error, Reason} -> ?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}), {error, Reason} end. %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- -spec handle_timeout(reference(), Msg :: term(), channel()) -> {ok, channel()} | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()}. handle_timeout( _TRef, {keepalive, _StatVal}, Channel = #channel{keepalive = undefined} ) -> {ok, Channel}; handle_timeout( _TRef, {keepalive, _StatVal}, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive} ) -> case emqx_ocpp_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> handle_out(disconnect, keepalive_timeout, Channel) end; handle_timeout(_TRef, connection_expire, Channel) -> %% No take over implemented, so just shutdown shutdown(expired, Channel); handle_timeout(_TRef, Msg, Channel) -> ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}), {ok, Channel}. %%-------------------------------------------------------------------- %% Ensure timers %%-------------------------------------------------------------------- ensure_timer([Name], Channel) -> ensure_timer(Name, Channel); ensure_timer([Name | Rest], Channel) -> ensure_timer(Rest, ensure_timer(Name, Channel)); ensure_timer(Name, Channel = #channel{timers = Timers}) -> TRef = maps:get(Name, Timers, undefined), Time = interval(Name, Channel), case TRef == undefined andalso Time > 0 of true -> ensure_timer(Name, Time, Channel); %% Timer disabled or exists false -> Channel end. ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> ensure_timer(Name, clean_timer(Name, Channel)). clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_ocpp_keepalive:info(interval, KeepAlive). %%-------------------------------------------------------------------- %% Terminate %%-------------------------------------------------------------------- -spec terminate(any(), channel()) -> ok. terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); terminate({shutdown, Reason}, Channel) when Reason =:= kicked; Reason =:= discarded -> run_terminate_hook(Reason, Channel); terminate(Reason, Channel) -> run_terminate_hook(Reason, Channel). run_terminate_hook(Reason, Channel = #channel{clientinfo = ClientInfo}) -> emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(session, Channel)]). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% Frame frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALL}) -> emqx_utils_json:encode( #{ <<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALL, <<"UniqueId">> => maps:get(id, Frame), <<"Action">> => maps:get(action, Frame), <<"Payload">> => maps:get(payload, Frame) } ); frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALLRESULT}) -> emqx_utils_json:encode( #{ <<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT, <<"UniqueId">> => maps:get(id, Frame), <<"Payload">> => maps:get(payload, Frame) } ); frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}) -> emqx_utils_json:encode( #{ <<"MessageTypeId">> => maps:get(type, Frame), <<"UniqueId">> => maps:get(id, Frame), <<"ErrorCode">> => maps:get(error_code, Frame), <<"ErrorDescription">> => maps:get(error_desc, Frame) } ). payload2frame(Payload) when is_binary(Payload) -> payload2frame(emqx_utils_json:decode(Payload, [return_maps])); payload2frame(#{ <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALL, <<"UniqueId">> := Id, <<"Action">> := Action, <<"Payload">> := Payload }) -> #{ type => ?OCPP_MSG_TYPE_ID_CALL, id => Id, action => Action, payload => Payload }; payload2frame(#{ <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT, <<"UniqueId">> := Id, <<"Payload">> := Payload }) -> #{ type => ?OCPP_MSG_TYPE_ID_CALLRESULT, id => Id, action => undefined, payload => Payload }; payload2frame(#{ <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLERROR, <<"UniqueId">> := Id, <<"ErrorCode">> := ErrorCode, <<"ErrorDescription">> := ErrorDescription }) -> #{ type => ?OCPP_MSG_TYPE_ID_CALLERROR, id => Id, error_code => ErrorCode, error_desc => ErrorDescription }. %%-------------------------------------------------------------------- %% Ensure connected ensure_connected( Channel = #channel{ conninfo = ConnInfo, clientinfo = ClientInfo } ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), schedule_connection_expire(Channel#channel{ conninfo = NConnInfo, conn_state = connected }). schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) -> case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of undefined -> Channel; Interval -> ensure_timer(connection_expire_timer, Interval, Channel) end. ensure_disconnected( Reason, Channel = #channel{ conninfo = ConnInfo, clientinfo = ClientInfo } ) -> NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. %%-------------------------------------------------------------------- %% Ensure Keepalive ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel). ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel) -> Keepalive = emqx_ocpp_keepalive:init( timer:seconds(Interval), heartbeat_checking_times_backoff() ), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). reset_keepalive(Interval, Channel = #channel{conninfo = ConnInfo, timers = Timers}) -> case maps:get(alive_timer, Timers, undefined) of undefined -> Channel; TRef -> NConnInfo = ConnInfo#{keepalive => Interval}, emqx_utils:cancel_timer(TRef), ensure_keepalive_timer( Interval, Channel#channel{ conninfo = NConnInfo, timers = maps:without([alive_timer], Timers) } ) end. heartbeat_checking_times_backoff() -> max(0, emqx_ocpp_conf:heartbeat_checking_times_backoff() - 1). %%-------------------------------------------------------------------- %% Ensure Subscriptions ensure_subscribe_dn_topics( Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo} ) -> SubOpts = ?DEFAULT_OCPP_DN_SUBOPTS, Topic = dntopic(ClientId, Mountpoint), ok = emqx_broker:subscribe(Topic, ClientId, SubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), Channel. dntopic(ClientId, Mountpoint) -> Topic0 = proc_tmpl( emqx_ocpp_conf:dntopic(), #{ clientid => ClientId, cid => ClientId } ), emqx_mountpoint:mount(Mountpoint, Topic0). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -compile({inline, [run_hooks/3]}). run_hooks(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). run_hooks(Name, Args, Acc) -> ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc). -compile({inline, [reply/2, shutdown/2, shutdown/3]}). reply(Reply, Channel) -> {reply, Reply, Channel}. shutdown(success, Channel) -> shutdown(normal, Channel); shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. shutdown(success, Reply, Channel) -> shutdown(normal, Reply, Channel); shutdown(Reason, Reply, Channel) -> {shutdown, Reason, Reply, Channel}. proc_tmpl(Tmpl, Vars) -> Tokens = emqx_placeholder:preproc_tmpl(Tmpl), emqx_placeholder:proc_tmpl(Tokens, Vars). %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- set_field(Name, Value, Channel) -> Pos = emqx_utils:index_of(Name, record_info(fields, channel)), setelement(Pos + 1, Channel, Value).