diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index bb80eac73..7195943a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -380,7 +380,8 @@ fields(Gw) when Gw == coap; Gw == lwm2m; Gw == exproto; - Gw == gbt32960 + Gw == gbt32960; + Gw == ocpp -> [{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++ convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw)); @@ -390,7 +391,8 @@ fields(Gw) when Gw == update_coap; Gw == update_lwm2m; Gw == update_exproto; - Gw == update_gbt32960 + Gw == update_gbt32960; + Gw == update_ocpp -> "update_" ++ GwStr = atom_to_list(Gw), Gw1 = list_to_existing_atom(GwStr), @@ -399,14 +401,18 @@ fields(Listener) when Listener == tcp_listener; Listener == ssl_listener; Listener == udp_listener; - Listener == dtls_listener + Listener == dtls_listener; + Listener == ws_listener; + Listener == wss_listener -> Type = case Listener of tcp_listener -> tcp; ssl_listener -> ssl; udp_listener -> udp; - dtls_listener -> dtls + dtls_listener -> dtls; + ws_listener -> ws; + wss_listener -> wss end, [ {id, @@ -492,14 +498,18 @@ listeners_schema(?R_REF(_Mod, tcp_udp_listeners)) -> ref(udp_listener), ref(dtls_listener) ]) - ). + ); +listeners_schema(?R_REF(_Mod, ws_listeners)) -> + hoconsc:array(hoconsc:union([ref(ws_listener), ref(wss_listener)])). listener_schema() -> hoconsc:union([ ref(?MODULE, tcp_listener), ref(?MODULE, ssl_listener), ref(?MODULE, udp_listener), - ref(?MODULE, dtls_listener) + ref(?MODULE, dtls_listener), + ref(?MODULE, ws_listener), + ref(?MODULE, wss_listener) ]). %%-------------------------------------------------------------------- @@ -770,6 +780,35 @@ examples_gateway_confs() -> } ] } + }, + ocpp_gateway => + #{ + summary => <<"A simple OCPP gateway config">>, + vaule => + #{ + enable => true, + name => <<"ocpp">>, + enable_stats => true, + mountpoint => <<"ocpp/">>, + default_heartbeat_interval => <<"60s">>, + upstream => + #{ + topic => <<"cp/${cid}">>, + reply_topic => <<"cp/${cid}/reply">>, + error_topic => <<"cp/${cid}/error">> + }, + dnstream => #{topic => <<"cp/${cid}">>}, + message_format_checking => disable, + listeners => + [ + #{ + type => <<"ws">>, + name => <<"default">>, + bind => <<"33033">>, + max_connections => 1024000 + } + ] + } } }. @@ -881,5 +920,24 @@ examples_update_gateway_confs() -> max_retry_times => 3, message_queue_len => 10 } + }, + ocpp_gateway => + #{ + summary => <<"A simple OCPP gateway config">>, + vaule => + #{ + enable => true, + enable_stats => true, + mountpoint => <<"ocpp/">>, + default_heartbeat_interval => <<"60s">>, + upstream => + #{ + topic => <<"cp/${cid}">>, + reply_topic => <<"cp/${cid}/reply">>, + error_topic => <<"cp/${cid}/error">> + }, + dnstream => #{topic => <<"cp/${cid}">>}, + message_format_checking => disable + } } }. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 607cac27d..5d8ac23d9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -56,7 +56,7 @@ -export([mountpoint/0, mountpoint/1, gateway_common_options/0, gateway_schema/1, gateway_names/0]). --export([ws_listener/2, wss_listener/2]). +-export([ws_listener/0, wss_listener/0, ws_opts/2]). namespace() -> gateway. @@ -129,6 +129,10 @@ fields(ssl_listener) -> } )} ]; +fields(ws_listener) -> + ws_listener() ++ ws_opts(<<>>, <<>>); +fields(wss_listener) -> + wss_listener() ++ ws_opts(<<>>, <<>>); fields(udp_listener) -> [ %% some special configs for udp listener @@ -252,21 +256,16 @@ mountpoint(Default) -> } ). -ws_listener(DefaultPath, DefaultSubProtocols) when - is_binary(DefaultPath), is_binary(DefaultSubProtocols) --> +ws_listener() -> [ {acceptors, sc(integer(), #{default => 16, desc => ?DESC(tcp_listener_acceptors)})} ] ++ - ws_opts(DefaultPath, DefaultSubProtocols) ++ tcp_opts() ++ proxy_protocol_opts() ++ common_listener_opts(). -wss_listener(DefaultPath, DefaultSubProtocols) when - is_binary(DefaultPath), is_binary(DefaultSubProtocols) --> - ws_listener(DefaultPath, DefaultSubProtocols) ++ +wss_listener() -> + ws_listener() ++ [ {ssl_options, sc( @@ -278,7 +277,9 @@ wss_listener(DefaultPath, DefaultSubProtocols) when )} ]. -ws_opts(DefaultPath, DefaultSubProtocols) -> +ws_opts(DefaultPath, DefaultSubProtocols) when + is_binary(DefaultPath), is_binary(DefaultSubProtocols) +-> [ {"path", sc( @@ -378,7 +379,7 @@ ws_opts(DefaultPath, DefaultSubProtocols) -> )}, {"deflate_opts", sc( - ref("deflate_opts"), + ref(emqx_schema, "deflate_opts"), #{} )} ]. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 8cc1396b4..57e7998f4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -82,6 +82,11 @@ max_mailbox_size => 32000 }). +-define(IS_ESOCKD_LISTENER(T), + T == tcp orelse T == ssl orelse T == udp orelse T == dtls +). +-define(IS_COWBOY_LISTENER(T), T == ws orelse T == wss). + -elvis([{elvis_style, god_modules, disable}]). -spec childspec(supervisor:worker(), Mod :: atom()) -> @@ -135,7 +140,7 @@ find_sup_child(Sup, ChildId) -> {ok, [pid()]} | {error, term()} when - ModCfg :: #{frame_mod := atom(), chann_mod := atom()}. + ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}. start_listeners(Listeners, GwName, Ctx, ModCfg) -> start_listeners(Listeners, GwName, Ctx, ModCfg, []). @@ -167,13 +172,12 @@ start_listeners([L | Ls], GwName, Ctx, ModCfg, Acc) -> start_listener( GwName, Ctx, - {Type, LisName, ListenOn, SocketOpts, Cfg}, + {Type, LisName, ListenOn, Cfg}, ModCfg ) -> ListenOnStr = emqx_listeners:format_bind(ListenOn), ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = maps:merge(Cfg, ModCfg), case start_listener( GwName, @@ -181,8 +185,8 @@ start_listener( Type, LisName, ListenOn, - SocketOpts, - NCfg + Cfg, + ModCfg ) of {ok, Pid} -> @@ -199,15 +203,74 @@ start_listener( emqx_gateway_utils:supervisor_ret({error, Reason}) end. -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> +start_listener(GwName, Ctx, Type, LisName, ListenOn, Confs, ModCfg) when + ?IS_ESOCKD_LISTENER(Type) +-> Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ - ctx => Ctx, - listener => {GwName, Type, LisName} - }, - NSocketOpts = merge_default(Type, SocketOpts), - MFA = {emqx_gateway_conn, start_link, [NCfg]}, - do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA). + SocketOpts = merge_default(Type, esockd_opts(Type, Confs)), + HighLevelCfgs0 = filter_out_low_level_opts(Type, Confs), + HighLevelCfgs = maps:merge( + HighLevelCfgs0, + ModCfg#{ + ctx => Ctx, + listener => {GwName, Type, LisName} + } + ), + ConnMod = maps:get(connection_mod, ModCfg, emqx_gateway_conn), + MFA = {ConnMod, start_link, [HighLevelCfgs]}, + do_start_listener(Type, Name, ListenOn, SocketOpts, MFA); +start_listener(GwName, Ctx, Type, LisName, ListenOn, Confs, ModCfg) when + ?IS_COWBOY_LISTENER(Type) +-> + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), + RanchOpts = ranch_opts(Type, ListenOn, Confs), + HighLevelCfgs0 = filter_out_low_level_opts(Type, Confs), + HighLevelCfgs = maps:merge( + HighLevelCfgs0, + ModCfg#{ + ctx => Ctx, + listener => {GwName, Type, LisName} + } + ), + WsOpts = ws_opts(Confs, HighLevelCfgs), + case Type of + ws -> cowboy:start_clear(Name, RanchOpts, WsOpts); + wss -> cowboy:start_tls(Name, RanchOpts, WsOpts) + end. + +filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_ESOCKD_LISTENER(Type) -> + EsockdKeys = [ + gw_conf, + bind, + acceptors, + max_connections, + max_conn_rate, + proxy_protocol, + proxy_protocol_timeout, + tcp_options, + ssl_options, + udp_options, + dtls_options + ], + Conf1 = maps:without(EsockdKeys, RawCfg), + maps:merge(Conf0, Conf1); +filter_out_low_level_opts(Type, RawCfg = #{gw_conf := Conf0}) when ?IS_COWBOY_LISTENER(Type) -> + CowboyKeys = [ + gw_conf, + bind, + acceptors, + max_connections, + max_conn_rate, + proxy_protocol, + proxy_protocol_timeout, + tcp_options, + ssl_options, + udp_options, + dtls_options, + websocket + ], + Conf1 = maps:without(CowboyKeys, RawCfg), + maps:merge(Conf0, Conf1). merge_default(Udp, Options) -> {Key, Default} = @@ -380,8 +443,8 @@ stringfy(T) -> Type :: udp | tcp | ssl | dtls, Name :: atom(), ListenOn :: esockd:listen_on(), - SocketOpts :: esockd:option(), - Cfg :: map() + RawCfg :: map(), + ConnCfg :: map() }). normalize_config(RawConf) -> LisMap = maps:get(listeners, RawConf, #{}), @@ -393,14 +456,7 @@ normalize_config(RawConf) -> maps:fold( fun(Name, Confs, AccIn2) -> ListenOn = maps:get(bind, Confs), - SocketOpts = esockd_opts(Type, Confs), - RemainCfgs = maps:without( - [bind, tcp, ssl, udp, dtls] ++ - proplists:get_keys(SocketOpts), - Confs - ), - Cfg = maps:merge(Cfg0, RemainCfgs), - [{Type, Name, ListenOn, SocketOpts, Cfg} | AccIn2] + [{Type, Name, ListenOn, Confs#{gw_conf => Cfg0}} | AccIn2] end, [], Liss @@ -412,7 +468,7 @@ normalize_config(RawConf) -> ) ). -esockd_opts(Type, Opts0) -> +esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) -> Opts1 = maps:with( [ acceptors, @@ -427,37 +483,70 @@ esockd_opts(Type, Opts0) -> maps:to_list( case Type of tcp -> - Opts2#{tcp_options => sock_opts(tcp, Opts0)}; + Opts2#{tcp_options => sock_opts(tcp_options, Opts0)}; ssl -> Opts2#{ - tcp_options => sock_opts(tcp, Opts0), - ssl_options => ssl_opts(ssl, Opts0) + tcp_options => sock_opts(tcp_options, Opts0), + ssl_options => ssl_opts(ssl_options, Opts0) }; udp -> - Opts2#{udp_options => sock_opts(udp, Opts0)}; + Opts2#{udp_options => sock_opts(udp_options, Opts0)}; dtls -> Opts2#{ - udp_options => sock_opts(udp, Opts0), - dtls_options => ssl_opts(dtls, Opts0) + udp_options => sock_opts(udp_options, Opts0), + dtls_options => ssl_opts(dtls_options, Opts0) } end ). +sock_opts(Name, Opts) -> + maps:to_list( + maps:without( + [active_n, keepalive], + maps:get(Name, Opts, #{}) + ) + ). + ssl_opts(Name, Opts) -> Type = case Name of - ssl -> tls; - dtls -> dtls + ssl_options -> tls; + dtls_options -> dtls end, emqx_tls_lib:to_server_opts(Type, maps:get(Name, Opts, #{})). -sock_opts(Name, Opts) -> - maps:to_list( - maps:without( - [active_n], - maps:get(Name, Opts, #{}) - ) - ). +ranch_opts(Type, ListenOn, Opts) -> + NumAcceptors = maps:get(acceptors, Opts, 4), + MaxConnections = maps:get(max_connections, Opts, 1024), + SocketOpts1 = + case Type of + wss -> + sock_opts(tcp_options, Opts) ++ + proplists:delete(handshake_timeout, ssl_opts(ssl_options, Opts)); + ws -> + sock_opts(tcp_options, Opts) + end, + SocketOpts = ip_port(ListenOn) ++ proplists:delete(reuseaddr, SocketOpts1), + #{ + num_acceptors => NumAcceptors, + max_connections => MaxConnections, + handshake_timeout => maps:get(handshake_timeout, Opts, 15000), + socket_opts => SocketOpts + }. + +ws_opts(Opts, Conf) -> + ConnMod = maps:get(connection_mod, Conf, emqx_gateway_conn), + WsPaths = [ + {emqx_utils_maps:deep_get([websocket, path], Opts, "/"), ConnMod, Conf} + ], + Dispatch = cowboy_router:compile([{'_', WsPaths}]), + ProxyProto = maps:get(proxy_protocol, Opts, false), + #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. + +ip_port(Port) when is_integer(Port) -> + [{port, Port}]; +ip_port({Addr, Port}) -> + [{ip, Addr}, {port, Port}]. %%-------------------------------------------------------------------- %% Envs diff --git a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.erl b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.erl index 68a374d9d..df04b3750 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.erl @@ -52,7 +52,8 @@ on_gateway_load( Listeners = normalize_config(Config), ModCfg = #{ frame_mod => emqx_ocpp_frame, - chann_mod => emqx_ocpp_channel + chann_mod => emqx_ocpp_channel, + connection_mod => emqx_ocpp_connection }, case start_listeners( diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schema.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schema.erl index a30f17b5d..61747404a 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schema.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schema.erl @@ -29,7 +29,7 @@ fields(ocpp) -> integer(), #{ default => 1, - required => true, + required => false, desc => ?DESC(heartbeat_checking_times_backoff) } )}, @@ -39,7 +39,7 @@ fields(ocpp) -> sc( hoconsc:union([all, upstream_only, dnstream_only, disable]), #{ - default => all, + default => disable, desc => ?DESC(message_format_checking) } )}, @@ -59,15 +59,21 @@ fields(ocpp) -> desc => ?DESC(json_schema_id_prefix) } )}, - {listeners, sc(ref(listeners), #{desc => ?DESC(listeners)})} + {listeners, sc(ref(ws_listeners), #{desc => ?DESC(ws_listeners)})} ] ++ emqx_gateway_schema:gateway_common_options(); -fields(listeners) -> +fields(ws_listeners) -> + [ + {ws, sc(map(name, ref(ws_listener)), #{})}, + {wss, sc(map(name, ref(wss_listener)), #{})} + ]; +fields(ws_listener) -> + emqx_gateway_schema:ws_listener() ++ [{websocket, sc(ref(websocket), #{})}]; +fields(wss_listener) -> + emqx_gateway_schema:wss_listener() ++ [{websocket, sc(ref(websocket), #{})}]; +fields(websocket) -> DefaultPath = <<"/ocpp">>, SubProtocols = <<"ocpp1.6, ocpp2.0">>, - [ - {ws, emqx_gateway_schema:ws_listener(DefaultPath, SubProtocols)}, - {wss, emqx_gateway_schema:wss_listener(DefaultPath, SubProtocols)} - ]; + emqx_gateway_schema:ws_opts(DefaultPath, SubProtocols); fields(upstream) -> [ {topic, @@ -168,5 +174,8 @@ desc(_) -> sc(Type, Meta) -> hoconsc:mk(Type, Meta). +map(Name, Type) -> + hoconsc:map(Name, Type). + ref(Field) -> hoconsc:ref(?MODULE, Field).