emqx/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.erl

103 lines
2.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc The OCPP Gateway implement
-module(emqx_gateway_ocpp).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named ocpp
-gateway(#{
name => ocpp,
callback_module => ?MODULE,
config_schema_module => emqx_ocpp_schema,
edition => ee
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
[
normalize_config/1,
start_listeners/4,
stop_listeners/2
]
).
%%--------------------------------------------------------------------
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(
_Gateway = #{
name := GwName,
config := Config
},
Ctx
) ->
%% ensure json schema validator is loaded
emqx_ocpp_schemas:load(),
Listeners = normalize_config(Config),
ModCfg = #{
frame_mod => emqx_ocpp_frame,
chann_mod => emqx_ocpp_channel,
connection_mod => emqx_ocpp_connection
},
case
start_listeners(
Listeners, GwName, Ctx, ModCfg
)
of
{ok, ListenerPids} ->
%% FIXME: How to throw an exception to interrupt the restart logic ?
%% FIXME: Assign ctx to GwState
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
{error, {Reason, Listener}} ->
throw(
{badconf, #{
key => listeners,
value => Listener,
reason => Reason
}}
)
end.
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old state???
on_gateway_unload(Gateway, GwState),
on_gateway_load(Gateway#{config => Config}, Ctx)
catch
Class:Reason:Stk ->
logger:error(
"Failed to update ~ts; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwName, Class, Reason, Stk]
),
{error, Reason}
end.
on_gateway_unload(
_Gateway = #{
name := GwName,
config := Config
},
_GwState
) ->
Listeners = normalize_config(Config),
stop_listeners(GwName, Listeners).