emqx/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

914 lines
28 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ocpp_channel).
-behaviour(emqx_gateway_channel).
-include("emqx_ocpp.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[OCPP-Chann]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([
info/1,
info/2,
stats/1
]).
-export([
init/2,
authenticate/2,
handle_in/2,
handle_deliver/2,
handle_out/3,
handle_timeout/3,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
%% Exports for CT
-export([set_field/3]).
-export_type([channel/0]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% ConnInfo
conninfo :: emqx_types:conninfo(),
%% ClientInfo
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: option(map()),
%% ClientInfo override specs
clientinfo_override :: map(),
%% Keepalive
keepalive :: option(emqx_ocpp_keepalive:keepalive()),
%% Stores all unsent messages.
mqueue :: queue:queue(),
%% Timers
timers :: #{atom() => disabled | option(reference())},
%% Conn State
conn_state :: conn_state()
}).
-type channel() :: #channel{}.
-type conn_state() :: idle | connecting | connected | disconnected.
-type reply() ::
{outgoing, emqx_ocpp_frame:frame()}
| {outgoing, [emqx_ocpp_frame:frame()]}
| {event, conn_state() | updated}
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive,
connection_expire_timer => connection_expire
}).
-define(INFO_KEYS, [
conninfo,
conn_state,
clientinfo,
session
]).
-define(CHANNEL_METRICS, [
recv_pkt,
recv_msg,
'recv_msg.qos0',
'recv_msg.qos1',
'recv_msg.qos2',
'recv_msg.dropped',
'recv_msg.dropped.await_pubrel_timeout',
send_pkt,
send_msg,
'send_msg.qos0',
'send_msg.qos1',
'send_msg.qos2',
'send_msg.dropped',
'send_msg.dropped.expired',
'send_msg.dropped.queue_full',
'send_msg.dropped.too_large'
]).
-define(DEFAULT_OVERRIDE,
%% Generate clientid by default
#{
clientid => <<"">>,
username => <<"">>,
password => <<"">>
}
).
-define(DEFAULT_OCPP_DN_SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1}).
-dialyzer(no_match).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(socktype, #channel{conninfo = ConnInfo}) ->
maps:get(socktype, ConnInfo, undefined);
info(peername, #channel{conninfo = ConnInfo}) ->
maps:get(peername, ConnInfo, undefined);
info(sockname, #channel{conninfo = ConnInfo}) ->
maps:get(sockname, ConnInfo, undefined);
info(proto_name, #channel{conninfo = ConnInfo}) ->
maps:get(proto_name, ConnInfo, undefined);
info(proto_ver, #channel{conninfo = ConnInfo}) ->
maps:get(proto_ver, ConnInfo, undefined);
info(connected_at, #channel{conninfo = ConnInfo}) ->
maps:get(connected_at, ConnInfo, undefined);
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(zone, #channel{clientinfo = ClientInfo}) ->
maps:get(zone, ClientInfo, undefined);
info(clientid, #channel{clientinfo = ClientInfo}) ->
maps:get(clientid, ClientInfo, undefined);
info(username, #channel{clientinfo = ClientInfo}) ->
maps:get(username, ClientInfo, undefined);
info(session, #channel{conninfo = ConnInfo}) ->
%% XXX:
#{
created_at => maps:get(connected_at, ConnInfo, undefined),
is_persistent => false,
subscriptions => #{},
upgrade_qos => false,
retry_interval => 0,
await_rel_timeout => 0
};
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(keepalive, #channel{keepalive = Keepalive}) ->
emqx_utils:maybe_apply(fun emqx_ocpp_keepalive:info/1, Keepalive);
info(ctx, #channel{ctx = Ctx}) ->
Ctx;
info(timers, #channel{timers = Timers}) ->
Timers.
-spec stats(channel()) -> emqx_types:stats().
stats(#channel{mqueue = MQueue}) ->
%% XXX: A fake stats for managed by emqx_management
SessionStats = [
{subscriptions_cnt, 1},
{subscriptions_max, 1},
{inflight_cnt, 0},
{inflight_max, 0},
{mqueue_len, queue:len(MQueue)},
{mqueue_max, queue:len(MQueue)},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
{awaiting_rel_max, 0}
],
lists:append(SessionStats, emqx_pd:get_counters(?CHANNEL_METRICS)).
%%--------------------------------------------------------------------
%% Init the channel
%%--------------------------------------------------------------------
-spec init(emqx_types:conninfo(), map()) -> channel().
init(
ConnInfo = #{
peername := {PeerHost, _Port},
sockname := {_Host, SockPort}
},
Options
) ->
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Options, undefined),
ListenerId =
case maps:get(listener, Options, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
EnableAuthn = maps:get(enable_authn, Options, true),
ClientInfo = setting_peercert_infos(
Peercert,
#{
zone => default,
listener => ListenerId,
protocol => ocpp,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
username => undefined,
is_bridge => false,
is_superuser => false,
enalbe_authn => EnableAuthn,
mountpoint => Mountpoint
}
),
ConnInfo1 = ConnInfo#{
keepalive => emqx_ocpp_conf:default_heartbeat_interval()
},
{NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo1),
Ctx = maps:get(ctx, Options),
Override = maps:merge(
?DEFAULT_OVERRIDE,
maps:get(clientinfo_override, Options, #{})
),
#channel{
ctx = Ctx,
conninfo = NConnInfo,
clientinfo = NClientInfo,
clientinfo_override = Override,
mqueue = queue:new(),
timers = #{},
conn_state = idle
}.
setting_peercert_infos(NoSSL, ClientInfo) when
NoSSL =:= nossl;
NoSSL =:= undefined
->
ClientInfo;
setting_peercert_infos(Peercert, ClientInfo) ->
{DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)},
ClientInfo#{dn => DN, cn => CN}.
take_ws_cookie(ClientInfo, ConnInfo) ->
case maps:take(ws_cookie, ConnInfo) of
{WsCookie, NConnInfo} ->
{ClientInfo#{ws_cookie => WsCookie}, NConnInfo};
_ ->
{ClientInfo, ConnInfo}
end.
authenticate(UserInfo, Channel) ->
case
emqx_utils:pipeline(
[
fun enrich_client/2,
fun run_conn_hooks/2,
fun check_banned/2,
fun auth_connect/2
],
UserInfo,
Channel#channel{conn_state = connecting}
)
of
{ok, _, NChannel} ->
{ok, NChannel};
{error, Reason, _NChannel} ->
{error, Reason}
end.
enrich_client(
#{
clientid := ClientId,
username := Username,
proto_name := ProtoName,
proto_ver := ProtoVer
},
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{
clientid => ClientId,
username => Username,
proto_name => ProtoName,
proto_ver => ProtoVer,
clean_start => true,
conn_props => #{},
expiry_interval => 0,
receive_maximum => 1
},
NClientInfo =
ClientInfo#{
clientid => ClientId,
username => Username
},
{ok, Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}}.
set_log_meta(#channel{
clientinfo = #{clientid := ClientId},
conninfo = #{peername := Peername}
}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)),
emqx_logger:set_metadata_clientid(ClientId).
run_conn_hooks(_UserInfo, Channel = #channel{conninfo = ConnInfo}) ->
case run_hooks('client.connect', [ConnInfo], #{}) of
Error = {error, _Reason} -> Error;
_NConnProps -> {ok, Channel}
end.
check_banned(_UserInfo, #channel{clientinfo = ClientInfo}) ->
case emqx_banned:check(ClientInfo) of
true -> {error, banned};
false -> ok
end.
auth_connect(
#{password := Password},
#channel{ctx = Ctx, clientinfo = ClientInfo} = Channel
) ->
#{
clientid := ClientId,
username := Username
} = ClientInfo,
case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?SLOG(warning, #{
msg => "client_login_failed",
clientid => ClientId,
username => Username,
reason => Reason
}),
{error, Reason}
end.
publish(
Frame,
Channel = #channel{
clientinfo =
#{
clientid := ClientId,
username := Username,
protocol := Protocol,
peerhost := PeerHost,
mountpoint := Mountpoint
},
conninfo = #{proto_ver := ProtoVer}
}
) when
is_map(Frame)
->
Topic0 = upstream_topic(Frame, Channel),
Topic = emqx_mountpoint:mount(Mountpoint, Topic0),
Payload = frame2payload(Frame),
emqx_broker:publish(
emqx_message:make(
ClientId,
?QOS_2,
Topic,
Payload,
#{},
#{
protocol => Protocol,
proto_ver => ProtoVer,
username => Username,
peerhost => PeerHost
}
)
).
upstream_topic(
Frame = #{id := Id, type := Type},
#channel{clientinfo = #{clientid := ClientId}}
) ->
Vars = #{id => Id, type => Type, clientid => ClientId, cid => ClientId},
case Type of
?OCPP_MSG_TYPE_ID_CALL ->
Action = maps:get(action, Frame),
proc_tmpl(
emqx_ocpp_conf:uptopic(Action),
Vars#{action => Action}
);
?OCPP_MSG_TYPE_ID_CALLRESULT ->
proc_tmpl(emqx_ocpp_conf:up_reply_topic(), Vars);
?OCPP_MSG_TYPE_ID_CALLERROR ->
proc_tmpl(emqx_ocpp_conf:up_error_topic(), Vars)
end.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_ocpp_frame:frame(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(?IS_REQ(Frame), Channel) ->
%% TODO: strit mode
_ = publish(Frame, Channel),
{ok, Channel};
handle_in(Frame = #{type := Type}, Channel) when
Type == ?OCPP_MSG_TYPE_ID_CALLRESULT;
Type == ?OCPP_MSG_TYPE_ID_CALLERROR
->
_ = publish(Frame, Channel),
try_deliver(Channel);
handle_in({frame_error, {badjson, ReasonStr}}, Channel) ->
shutdown({frame_error, {badjson, iolist_to_binary(ReasonStr)}}, Channel);
handle_in({frame_error, {validation_faliure, Id, ReasonStr}}, Channel) ->
handle_out(
dnstream,
?ERR_FRAME(Id, ?OCPP_ERR_FormationViolation, iolist_to_binary(ReasonStr)),
Channel
);
handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "unexpected_incoming", frame => Frame}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Process Disconnect
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()} | {ok, replies(), channel()}.
handle_deliver(Delivers, Channel) ->
NChannel =
lists:foldl(
fun({deliver, _, Msg}, Acc) ->
enqueue(Msg, Acc)
end,
Channel,
Delivers
),
try_deliver(NChannel).
enqueue(Msg, Channel = #channel{mqueue = MQueue}) ->
case queue:len(MQueue) > emqx_ocpp_conf:max_mqueue_len() of
false ->
try payload2frame(Msg#message.payload) of
Frame ->
Channel#channel{mqueue = queue:in(Frame, MQueue)}
catch
_:_ ->
?SLOG(error, #{msg => "drop_invalid_message", message => Msg}),
Channel
end;
true ->
?SLOG(error, #{msg => "drop_message", message => Msg, reason => message_queue_full}),
Channel
end.
try_deliver(Channel = #channel{mqueue = MQueue}) ->
case queue:is_empty(MQueue) of
false ->
%% TODO: strit_mode
Frames = queue:to_list(MQueue),
handle_out(dnstream, Frames, Channel#channel{mqueue = queue:new()});
true ->
{ok, Channel}
end.
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
-spec handle_out(atom(), term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_out(dnstream, Frames, Channel) ->
{Outgoings, NChannel} = apply_frame(Frames, Channel),
{ok, [{outgoing, Frames} | Outgoings], NChannel};
handle_out(disconnect, keepalive_timeout, Channel) ->
{shutdown, keepalive_timeout, Channel};
handle_out(Type, Data, Channel) ->
?SLOG(error, #{msg => "unexpected_outgoing", type => Type, data => Data}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Apply Response frame to channel state machine
%%--------------------------------------------------------------------
apply_frame(Frames, Channel) when is_list(Frames) ->
{Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames),
{lists:reverse(Outgoings), NChannel};
apply_frame(Frames, Channel) ->
?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}),
Channel.
do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) ->
case Status of
<<"Accepted">> ->
?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}),
{[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)};
_ ->
{Outgoings, Channel}
end;
do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) ->
?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}),
Acc.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
-spec handle_call(Req :: any(), From :: emqx_gateway_channel:gen_server_from(), channel()) ->
{reply, Reply :: any(), channel()}
| {shutdown, Reason :: any(), Reply :: any(), channel()}.
handle_call(kick, _From, Channel) ->
shutdown(kicked, ok, Channel);
handle_call(discard, _From, Channel) ->
shutdown(discarded, ok, Channel);
handle_call(
subscriptions,
_From,
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
) ->
Subs = [{dntopic(ClientId, Mountpoint), ?DEFAULT_OCPP_DN_SUBOPTS}],
reply({ok, Subs}, Channel);
handle_call(Req, From, Channel) ->
?SLOG(error, #{msg => "unexpected_call", req => Req, from => From}),
reply(ignored, Channel).
%%--------------------------------------------------------------------
%% Handle Cast
%%--------------------------------------------------------------------
-spec handle_cast(Req :: any(), channel()) ->
ok
| {ok, channel()}
| {shutdown, Reason :: term(), channel()}.
handle_cast(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_cast", req => Req}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle Info
%%--------------------------------------------------------------------
-spec handle_info(Info :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_info(after_init, Channel0) ->
set_log_meta(Channel0),
case process_connect(Channel0) of
{ok, Channel} ->
NChannel = ensure_keepalive(
ensure_connected(
ensure_subscribe_dn_topics(Channel)
)
),
{ok, [{event, connected}], NChannel};
{error, Reason} ->
shutdown(Reason, Channel0)
end;
handle_info({sock_closed, Reason}, Channel) ->
NChannel = ensure_disconnected({sock_closed, Reason}, Channel),
shutdown(Reason, NChannel);
handle_info(Info, Channel) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{ok, Channel}.
process_connect(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
SessFun = fun(_, _) -> #{} end,
case
emqx_gateway_ctx:open_session(
Ctx,
true,
ClientInfo,
ConnInfo,
SessFun
)
of
{ok, #{session := Session}} ->
NChannel = Channel#channel{session = Session},
{ok, NChannel};
{error, Reason} ->
?SLOG(error, #{msg => "failed_to_open_session", reason => Reason}),
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_ocpp_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
handle_out(disconnect, keepalive_timeout, Channel)
end;
handle_timeout(_TRef, connection_expire, Channel) ->
%% No take over implemented, so just shutdown
shutdown(expired, Channel);
handle_timeout(_TRef, Msg, Channel) ->
?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Ensure timers
%%--------------------------------------------------------------------
ensure_timer([Name], Channel) ->
ensure_timer(Name, Channel);
ensure_timer([Name | Rest], Channel) ->
ensure_timer(Rest, ensure_timer(Name, Channel));
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
%% Timer disabled or exists
false -> Channel
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_ocpp_keepalive:info(interval, KeepAlive).
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
-spec terminate(any(), channel()) -> ok.
terminate(_, #channel{conn_state = idle}) ->
ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate({shutdown, Reason}, Channel) when
Reason =:= kicked; Reason =:= discarded
->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel) ->
run_terminate_hook(Reason, Channel).
run_terminate_hook(Reason, Channel = #channel{clientinfo = ClientInfo}) ->
emqx_hooks:run('session.terminated', [ClientInfo, Reason, info(session, Channel)]).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Frame
frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALL}) ->
emqx_utils_json:encode(
#{
<<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALL,
<<"UniqueId">> => maps:get(id, Frame),
<<"Action">> => maps:get(action, Frame),
<<"Payload">> => maps:get(payload, Frame)
}
);
frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALLRESULT}) ->
emqx_utils_json:encode(
#{
<<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT,
<<"UniqueId">> => maps:get(id, Frame),
<<"Payload">> => maps:get(payload, Frame)
}
);
frame2payload(Frame = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}) ->
emqx_utils_json:encode(
#{
<<"MessageTypeId">> => maps:get(type, Frame),
<<"UniqueId">> => maps:get(id, Frame),
<<"ErrorCode">> => maps:get(error_code, Frame),
<<"ErrorDescription">> => maps:get(error_desc, Frame)
}
).
payload2frame(Payload) when is_binary(Payload) ->
payload2frame(emqx_utils_json:decode(Payload, [return_maps]));
payload2frame(#{
<<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALL,
<<"UniqueId">> := Id,
<<"Action">> := Action,
<<"Payload">> := Payload
}) ->
#{
type => ?OCPP_MSG_TYPE_ID_CALL,
id => Id,
action => Action,
payload => Payload
};
payload2frame(#{
<<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT,
<<"UniqueId">> := Id,
<<"Payload">> := Payload
}) ->
#{
type => ?OCPP_MSG_TYPE_ID_CALLRESULT,
id => Id,
action => undefined,
payload => Payload
};
payload2frame(#{
<<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLERROR,
<<"UniqueId">> := Id,
<<"ErrorCode">> := ErrorCode,
<<"ErrorDescription">> := ErrorDescription
}) ->
#{
type => ?OCPP_MSG_TYPE_ID_CALLERROR,
id => Id,
error_code => ErrorCode,
error_desc => ErrorDescription
}.
%%--------------------------------------------------------------------
%% Ensure connected
ensure_connected(
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire_timer, Interval, Channel)
end.
ensure_disconnected(
Reason,
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%--------------------------------------------------------------------
%% Ensure Keepalive
ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
ensure_keepalive_timer(0, Channel) ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_ocpp_keepalive:init(
timer:seconds(Interval),
heartbeat_checking_times_backoff()
),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
reset_keepalive(Interval, Channel = #channel{conninfo = ConnInfo, timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
undefined ->
Channel;
TRef ->
NConnInfo = ConnInfo#{keepalive => Interval},
emqx_utils:cancel_timer(TRef),
ensure_keepalive_timer(
Interval,
Channel#channel{
conninfo = NConnInfo,
timers = maps:without([alive_timer], Timers)
}
)
end.
heartbeat_checking_times_backoff() ->
max(0, emqx_ocpp_conf:heartbeat_checking_times_backoff() - 1).
%%--------------------------------------------------------------------
%% Ensure Subscriptions
ensure_subscribe_dn_topics(
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint} = ClientInfo}
) ->
SubOpts = ?DEFAULT_OCPP_DN_SUBOPTS,
Topic = dntopic(ClientId, Mountpoint),
ok = emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
Channel.
dntopic(ClientId, Mountpoint) ->
Topic0 = proc_tmpl(
emqx_ocpp_conf:dntopic(),
#{
clientid => ClientId,
cid => ClientId
}
),
emqx_mountpoint:mount(Mountpoint, Topic0).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
-compile({inline, [run_hooks/3]}).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name),
emqx_hooks:run(Name, Args).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name),
emqx_hooks:run_fold(Name, Args, Acc).
-compile({inline, [reply/2, shutdown/2, shutdown/3]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
shutdown(success, Channel) ->
shutdown(normal, Channel);
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown(success, Reply, Channel) ->
shutdown(normal, Reply, Channel);
shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
proc_tmpl(Tmpl, Vars) ->
Tokens = emqx_placeholder:preproc_tmpl(Tmpl),
emqx_placeholder:proc_tmpl(Tokens, Vars).
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
set_field(Name, Value, Channel) ->
Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value).