fix(ocpp): ensure ocpp gateway options works

This commit is contained in:
JianBo He 2023-11-06 11:56:50 +08:00
parent 7cab269e0b
commit bea0acd929
5 changed files with 223 additions and 65 deletions

View File

@ -380,7 +380,8 @@ fields(Gw) when
Gw == coap;
Gw == lwm2m;
Gw == exproto;
Gw == gbt32960
Gw == gbt32960;
Gw == ocpp
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw));
@ -390,7 +391,8 @@ fields(Gw) when
Gw == update_coap;
Gw == update_lwm2m;
Gw == update_exproto;
Gw == update_gbt32960
Gw == update_gbt32960;
Gw == update_ocpp
->
"update_" ++ GwStr = atom_to_list(Gw),
Gw1 = list_to_existing_atom(GwStr),
@ -399,14 +401,18 @@ fields(Listener) when
Listener == tcp_listener;
Listener == ssl_listener;
Listener == udp_listener;
Listener == dtls_listener
Listener == dtls_listener;
Listener == ws_listener;
Listener == wss_listener
->
Type =
case Listener of
tcp_listener -> tcp;
ssl_listener -> ssl;
udp_listener -> udp;
dtls_listener -> dtls
dtls_listener -> dtls;
ws_listener -> ws;
wss_listener -> wss
end,
[
{id,
@ -492,14 +498,18 @@ listeners_schema(?R_REF(_Mod, tcp_udp_listeners)) ->
ref(udp_listener),
ref(dtls_listener)
])
).
);
listeners_schema(?R_REF(_Mod, ws_listeners)) ->
hoconsc:array(hoconsc:union([ref(ws_listener), ref(wss_listener)])).
listener_schema() ->
hoconsc:union([
ref(?MODULE, tcp_listener),
ref(?MODULE, ssl_listener),
ref(?MODULE, udp_listener),
ref(?MODULE, dtls_listener)
ref(?MODULE, dtls_listener),
ref(?MODULE, ws_listener),
ref(?MODULE, wss_listener)
]).
%%--------------------------------------------------------------------
@ -770,6 +780,35 @@ examples_gateway_confs() ->
}
]
}
},
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
vaule =>
#{
enable => true,
name => <<"ocpp">>,
enable_stats => true,
mountpoint => <<"ocpp/">>,
default_heartbeat_interval => <<"60s">>,
upstream =>
#{
topic => <<"cp/${cid}">>,
reply_topic => <<"cp/${cid}/reply">>,
error_topic => <<"cp/${cid}/error">>
},
dnstream => #{topic => <<"cp/${cid}">>},
message_format_checking => disable,
listeners =>
[
#{
type => <<"ws">>,
name => <<"default">>,
bind => <<"33033">>,
max_connections => 1024000
}
]
}
}
}.
@ -881,5 +920,24 @@ examples_update_gateway_confs() ->
max_retry_times => 3,
message_queue_len => 10
}
},
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
vaule =>
#{
enable => true,
enable_stats => true,
mountpoint => <<"ocpp/">>,
default_heartbeat_interval => <<"60s">>,
upstream =>
#{
topic => <<"cp/${cid}">>,
reply_topic => <<"cp/${cid}/reply">>,
error_topic => <<"cp/${cid}/error">>
},
dnstream => #{topic => <<"cp/${cid}">>},
message_format_checking => disable
}
}
}.

View File

@ -56,7 +56,7 @@
-export([mountpoint/0, mountpoint/1, gateway_common_options/0, gateway_schema/1, gateway_names/0]).
-export([ws_listener/2, wss_listener/2]).
-export([ws_listener/0, wss_listener/0, ws_opts/2]).
namespace() -> gateway.
@ -129,6 +129,10 @@ fields(ssl_listener) ->
}
)}
];
fields(ws_listener) ->
ws_listener() ++ ws_opts(<<>>, <<>>);
fields(wss_listener) ->
wss_listener() ++ ws_opts(<<>>, <<>>);
fields(udp_listener) ->
[
%% some special configs for udp listener
@ -252,21 +256,16 @@ mountpoint(Default) ->
}
).
ws_listener(DefaultPath, DefaultSubProtocols) when
is_binary(DefaultPath), is_binary(DefaultSubProtocols)
->
ws_listener() ->
[
{acceptors, sc(integer(), #{default => 16, desc => ?DESC(tcp_listener_acceptors)})}
] ++
ws_opts(DefaultPath, DefaultSubProtocols) ++
tcp_opts() ++
proxy_protocol_opts() ++
common_listener_opts().
wss_listener(DefaultPath, DefaultSubProtocols) when
is_binary(DefaultPath), is_binary(DefaultSubProtocols)
->
ws_listener(DefaultPath, DefaultSubProtocols) ++
wss_listener() ->
ws_listener() ++
[
{ssl_options,
sc(
@ -278,7 +277,9 @@ wss_listener(DefaultPath, DefaultSubProtocols) when
)}
].
ws_opts(DefaultPath, DefaultSubProtocols) ->
ws_opts(DefaultPath, DefaultSubProtocols) when
is_binary(DefaultPath), is_binary(DefaultSubProtocols)
->
[
{"path",
sc(
@ -378,7 +379,7 @@ ws_opts(DefaultPath, DefaultSubProtocols) ->
)},
{"deflate_opts",
sc(
ref("deflate_opts"),
ref(emqx_schema, "deflate_opts"),
#{}
)}
].

View File

@ -82,6 +82,11 @@
max_mailbox_size => 32000
}).
-define(IS_ESOCKD_LISTENER(T),
T == tcp orelse T == ssl orelse T == udp orelse T == dtls
).
-define(IS_COWBOY_LISTENER(T), T == ws orelse T == wss).
-elvis([{elvis_style, god_modules, disable}]).
-spec childspec(supervisor:worker(), Mod :: atom()) ->
@ -135,7 +140,7 @@ find_sup_child(Sup, ChildId) ->
{ok, [pid()]}
| {error, term()}
when
ModCfg :: #{frame_mod := atom(), chann_mod := atom()}.
ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}.
start_listeners(Listeners, GwName, Ctx, ModCfg) ->
start_listeners(Listeners, GwName, Ctx, ModCfg, []).
@ -167,13 +172,12 @@ start_listeners([L | Ls], GwName, Ctx, ModCfg, Acc) ->
start_listener(
GwName,
Ctx,
{Type, LisName, ListenOn, SocketOpts, Cfg},
{Type, LisName, ListenOn, Cfg},
ModCfg
) ->
ListenOnStr = emqx_listeners:format_bind(ListenOn),
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = maps:merge(Cfg, ModCfg),
case
start_listener(
GwName,
@ -181,8 +185,8 @@ start_listener(
Type,
LisName,
ListenOn,
SocketOpts,
NCfg
Cfg,
ModCfg
)
of
{ok, Pid} ->
@ -199,15 +203,74 @@ start_listener(
emqx_gateway_utils:supervisor_ret({error, Reason})
end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
start_listener(GwName, Ctx, Type, LisName, ListenOn, Confs, ModCfg) when
?IS_ESOCKD_LISTENER(Type)
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{
ctx => Ctx,
listener => {GwName, Type, LisName}
},
NSocketOpts = merge_default(Type, SocketOpts),
MFA = {emqx_gateway_conn, start_link, [NCfg]},
do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA).
SocketOpts = merge_default(Type, esockd_opts(Type, Confs)),
HighLevelCfgs0 = filter_out_low_level_opts(Type, Confs),
HighLevelCfgs = maps:merge(
HighLevelCfgs0,
ModCfg#{
ctx => Ctx,
listener => {GwName, Type, LisName}
}
),
ConnMod = maps:get(connection_mod, ModCfg, emqx_gateway_conn),
MFA = {ConnMod, start_link, [HighLevelCfgs]},
do_start_listener(Type, Name, ListenOn, SocketOpts, MFA);
start_listener(GwName, Ctx, Type, LisName, ListenOn, Confs, ModCfg) when
?IS_COWBOY_LISTENER(Type)
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
RanchOpts = ranch_opts(Type, ListenOn, Confs),
HighLevelCfgs0 = filter_out_low_level_opts(Type, Confs),
HighLevelCfgs = maps:merge(
HighLevelCfgs0,
ModCfg#{
ctx => Ctx,
listener => {GwName, Type, LisName}
}
),
WsOpts = ws_opts(Confs, HighLevelCfgs),
case Type of
ws -> cowboy:start_clear(Name, RanchOpts, WsOpts);
wss -> cowboy:start_tls(Name, RanchOpts, WsOpts)
end.
filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_ESOCKD_LISTENER(Type) ->
EsockdKeys = [
gw_conf,
bind,
acceptors,
max_connections,
max_conn_rate,
proxy_protocol,
proxy_protocol_timeout,
tcp_options,
ssl_options,
udp_options,
dtls_options
],
Conf1 = maps:without(EsockdKeys, RawCfg),
maps:merge(Conf0, Conf1);
filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_COWBOY_LISTENER(Type) ->
CowboyKeys = [
gw_conf,
bind,
acceptors,
max_connections,
max_conn_rate,
proxy_protocol,
proxy_protocol_timeout,
tcp_options,
ssl_options,
udp_options,
dtls_options,
websocket
],
Conf1 = maps:without(CowboyKeys, RawCfg),
maps:merge(Conf0, Conf1).
merge_default(Udp, Options) ->
{Key, Default} =
@ -380,8 +443,8 @@ stringfy(T) ->
Type :: udp | tcp | ssl | dtls,
Name :: atom(),
ListenOn :: esockd:listen_on(),
SocketOpts :: esockd:option(),
Cfg :: map()
RawCfg :: map(),
ConnCfg :: map()
}).
normalize_config(RawConf) ->
LisMap = maps:get(listeners, RawConf, #{}),
@ -393,14 +456,7 @@ normalize_config(RawConf) ->
maps:fold(
fun(Name, Confs, AccIn2) ->
ListenOn = maps:get(bind, Confs),
SocketOpts = esockd_opts(Type, Confs),
RemainCfgs = maps:without(
[bind, tcp, ssl, udp, dtls] ++
proplists:get_keys(SocketOpts),
Confs
),
Cfg = maps:merge(Cfg0, RemainCfgs),
[{Type, Name, ListenOn, SocketOpts, Cfg} | AccIn2]
[{Type, Name, ListenOn, Confs#{gw_conf => Cfg0}} | AccIn2]
end,
[],
Liss
@ -412,7 +468,7 @@ normalize_config(RawConf) ->
)
).
esockd_opts(Type, Opts0) ->
esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
Opts1 = maps:with(
[
acceptors,
@ -427,37 +483,70 @@ esockd_opts(Type, Opts0) ->
maps:to_list(
case Type of
tcp ->
Opts2#{tcp_options => sock_opts(tcp, Opts0)};
Opts2#{tcp_options => sock_opts(tcp_options, Opts0)};
ssl ->
Opts2#{
tcp_options => sock_opts(tcp, Opts0),
ssl_options => ssl_opts(ssl, Opts0)
tcp_options => sock_opts(tcp_options, Opts0),
ssl_options => ssl_opts(ssl_options, Opts0)
};
udp ->
Opts2#{udp_options => sock_opts(udp, Opts0)};
Opts2#{udp_options => sock_opts(udp_options, Opts0)};
dtls ->
Opts2#{
udp_options => sock_opts(udp, Opts0),
dtls_options => ssl_opts(dtls, Opts0)
udp_options => sock_opts(udp_options, Opts0),
dtls_options => ssl_opts(dtls_options, Opts0)
}
end
).
sock_opts(Name, Opts) ->
maps:to_list(
maps:without(
[active_n, keepalive],
maps:get(Name, Opts, #{})
)
).
ssl_opts(Name, Opts) ->
Type =
case Name of
ssl -> tls;
dtls -> dtls
ssl_options -> tls;
dtls_options -> dtls
end,
emqx_tls_lib:to_server_opts(Type, maps:get(Name, Opts, #{})).
sock_opts(Name, Opts) ->
maps:to_list(
maps:without(
[active_n],
maps:get(Name, Opts, #{})
)
).
ranch_opts(Type, ListenOn, Opts) ->
NumAcceptors = maps:get(acceptors, Opts, 4),
MaxConnections = maps:get(max_connections, Opts, 1024),
SocketOpts1 =
case Type of
wss ->
sock_opts(tcp_options, Opts) ++
proplists:delete(handshake_timeout, ssl_opts(ssl_options, Opts));
ws ->
sock_opts(tcp_options, Opts)
end,
SocketOpts = ip_port(ListenOn) ++ proplists:delete(reuseaddr, SocketOpts1),
#{
num_acceptors => NumAcceptors,
max_connections => MaxConnections,
handshake_timeout => maps:get(handshake_timeout, Opts, 15000),
socket_opts => SocketOpts
}.
ws_opts(Opts, Conf) ->
ConnMod = maps:get(connection_mod, Conf, emqx_gateway_conn),
WsPaths = [
{emqx_utils_maps:deep_get([websocket, path], Opts, "/"), ConnMod, Conf}
],
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
ProxyProto = maps:get(proxy_protocol, Opts, false),
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
ip_port(Port) when is_integer(Port) ->
[{port, Port}];
ip_port({Addr, Port}) ->
[{ip, Addr}, {port, Port}].
%%--------------------------------------------------------------------
%% Envs

View File

@ -52,7 +52,8 @@ on_gateway_load(
Listeners = normalize_config(Config),
ModCfg = #{
frame_mod => emqx_ocpp_frame,
chann_mod => emqx_ocpp_channel
chann_mod => emqx_ocpp_channel,
connection_mod => emqx_ocpp_connection
},
case
start_listeners(

View File

@ -29,7 +29,7 @@ fields(ocpp) ->
integer(),
#{
default => 1,
required => true,
required => false,
desc => ?DESC(heartbeat_checking_times_backoff)
}
)},
@ -39,7 +39,7 @@ fields(ocpp) ->
sc(
hoconsc:union([all, upstream_only, dnstream_only, disable]),
#{
default => all,
default => disable,
desc => ?DESC(message_format_checking)
}
)},
@ -59,15 +59,21 @@ fields(ocpp) ->
desc => ?DESC(json_schema_id_prefix)
}
)},
{listeners, sc(ref(listeners), #{desc => ?DESC(listeners)})}
{listeners, sc(ref(ws_listeners), #{desc => ?DESC(ws_listeners)})}
] ++ emqx_gateway_schema:gateway_common_options();
fields(listeners) ->
fields(ws_listeners) ->
[
{ws, sc(map(name, ref(ws_listener)), #{})},
{wss, sc(map(name, ref(wss_listener)), #{})}
];
fields(ws_listener) ->
emqx_gateway_schema:ws_listener() ++ [{websocket, sc(ref(websocket), #{})}];
fields(wss_listener) ->
emqx_gateway_schema:wss_listener() ++ [{websocket, sc(ref(websocket), #{})}];
fields(websocket) ->
DefaultPath = <<"/ocpp">>,
SubProtocols = <<"ocpp1.6, ocpp2.0">>,
[
{ws, emqx_gateway_schema:ws_listener(DefaultPath, SubProtocols)},
{wss, emqx_gateway_schema:wss_listener(DefaultPath, SubProtocols)}
];
emqx_gateway_schema:ws_opts(DefaultPath, SubProtocols);
fields(upstream) ->
[
{topic,
@ -168,5 +174,8 @@ desc(_) ->
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
map(Name, Type) ->
hoconsc:map(Name, Type).
ref(Field) ->
hoconsc:ref(?MODULE, Field).