diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index eef1a42fb..c8cf979e3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -321,9 +321,48 @@ stop_listener(GwName, {Type, LisName, ListenOn, Cfg}) -> end, StopRet. -stop_listener(GwName, Type, LisName, ListenOn, _Cfg) -> +stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when + Type == tcp; + Type == ssl; + Type == udp; + Type == dtls +-> Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). + esockd:close(Name, ListenOn); +stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when + Type == ws; Type == wss +-> + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), + case cowboy:stop_listener(Name) of + ok -> + wait_listener_stopped(ListenOn); + Error -> + Error + end. + +wait_listener_stopped(ListenOn) -> + % NOTE + % `cowboy:stop_listener/1` will not close the listening socket explicitly, + % it will be closed by the runtime system **only after** the process exits. + Endpoint = maps:from_list(ip_port(ListenOn)), + case + gen_tcp:connect( + maps:get(ip, Endpoint, loopback), + maps:get(port, Endpoint), + [{active, false}] + ) + of + {error, _EConnrefused} -> + %% NOTE + %% We should get `econnrefused` here because acceptors are already dead + %% but don't want to crash if not, because this doesn't make any difference. + ok; + {ok, Socket} -> + %% NOTE + %% Tiny chance to get a connected socket here, when some other process + %% concurrently binds to the same port. + gen_tcp:close(Socket) + end. -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl index 99003763f..15cca6c14 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_schemas.erl @@ -89,7 +89,7 @@ feedvar(Path) -> binary_to_list( emqx_placeholder:proc_tmpl( emqx_placeholder:preproc_tmpl(Path), - #{application_priv => code:priv_dir(emqx_ocpp)} + #{application_priv => code:priv_dir(emqx_gateway_ocpp)} ) ). diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index 7c25ac5b3..1add0053c 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -16,36 +16,119 @@ -module(emqx_ocpp_SUITE). --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx.hrl"). - -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -all() -> - emqx_common_test_helpers:all(?MODULE). +-compile(export_all). +-compile(nowarn_export_all). -init_per_suite(Conf) -> - emqx_ct_helpers:start_apps([emqx_gateway_ocpp], fun set_special_cfg/1), - Conf. +-import( + emqx_gateway_test_utils, + [ + assert_fields_exist/2, + request/2, + request/3 + ] +). -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_gateway_ocpp]). +-define(HEARTBEAT, <<$\n>>). -set_special_cfg(emqx) -> - application:set_env(emqx, allow_anonymous, true), - application:set_env(emqx, enable_acl_cache, false), - LoadedPluginPath = filename:join(["test", "emqx_SUITE_data", "loaded_plugins"]), - application:set_env( - emqx, - plugins_loaded_file, - emqx_ct_helpers:deps_path(emqx, LoadedPluginPath) - ); -set_special_cfg(_App) -> - ok. +-define(CONF_DEFAULT, << + "\n" + "gateway.ocpp {\n" + " mountpoint = \"ocpp/\"\n" + " default_heartbeat_interval = \"60s\"\n" + " heartbeat_checking_times_backoff = 1\n" + " message_format_checking = disable\n" + " upstream {\n" + " topic = \"cp/${clientid}\"\n" + " reply_topic = \"cp/${clientid}/Reply\"\n" + " error_topic = \"cp/${clientid}/Reply\"\n" + " }\n" + " dnstream {\n" + " topic = \"cs/${clientid}\"\n" + " }\n" + " listeners.ws.default {\n" + " bind = \"0.0.0.0:33033\"\n" + " websocket.path = \"/ocpp\"\n" + " }\n" + "}\n" +>>). + +all() -> emqx_common_test_helpers:all(?MODULE). %%-------------------------------------------------------------------- -%% Testcases -%%--------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +init_per_suite(Config) -> + application:load(emqx_gateway_ocpp), + Apps = emqx_cth_suite:start( + [ + {emqx_conf, ?CONF_DEFAULT}, + emqx_gateway, + emqx_auth, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_common_test_http:create_default_app(), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_common_test_http:delete_default_app(), + emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +default_config() -> + ?CONF_DEFAULT. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_update_listeners(_Config) -> + {200, [DefaultListener]} = request(get, "/gateways/ocpp/listeners"), + + ListenerConfKeys = + [ + id, + type, + name, + enable, + enable_authn, + bind, + acceptors, + max_connections, + max_conn_rate, + proxy_protocol, + proxy_protocol_timeout, + websocket, + tcp_options + ], + StatusKeys = [status, node_status], + + assert_fields_exist(ListenerConfKeys ++ StatusKeys, DefaultListener), + ?assertMatch( + #{ + id := <<"ocpp:ws:default">>, + type := <<"ws">>, + name := <<"default">>, + enable := true, + enable_authn := true, + bind := <<"0.0.0.0:33033">>, + websocket := #{path := <<"/ocpp">>} + }, + DefaultListener + ), + + UpdateBody = emqx_utils_maps:deep_put( + [websocket, path], + maps:with(ListenerConfKeys, DefaultListener), + <<"/ocpp2">> + ), + {200, _} = request(put, "/gateways/ocpp/listeners/ocpp:ws:default", UpdateBody), + + {200, [UpdatedListener]} = request(get, "/gateways/ocpp/listeners"), + ?assertMatch(#{websocket := #{path := <<"/ocpp2">>}}, UpdatedListener).