fix(ocpp): fix 500 return for `PUT ...gateway/ocpp/<listener_id>`

This commit is contained in:
JianBo He 2023-12-15 11:06:14 +08:00
parent d5d05a1701
commit f0fa48ad28
3 changed files with 150 additions and 28 deletions

View File

@ -321,9 +321,48 @@ stop_listener(GwName, {Type, LisName, ListenOn, Cfg}) ->
end,
StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) ->
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
Type == tcp;
Type == ssl;
Type == udp;
Type == dtls
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn).
esockd:close(Name, ListenOn);
stop_listener(GwName, Type, LisName, ListenOn, _Cfg) when
Type == ws; Type == wss
->
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
case cowboy:stop_listener(Name) of
ok ->
wait_listener_stopped(ListenOn);
Error ->
Error
end.
wait_listener_stopped(ListenOn) ->
% NOTE
% `cowboy:stop_listener/1` will not close the listening socket explicitly,
% it will be closed by the runtime system **only after** the process exits.
Endpoint = maps:from_list(ip_port(ListenOn)),
case
gen_tcp:connect(
maps:get(ip, Endpoint, loopback),
maps:get(port, Endpoint),
[{active, false}]
)
of
{error, _EConnrefused} ->
%% NOTE
%% We should get `econnrefused` here because acceptors are already dead
%% but don't want to crash if not, because this doesn't make any difference.
ok;
{ok, Socket} ->
%% NOTE
%% Tiny chance to get a connected socket here, when some other process
%% concurrently binds to the same port.
gen_tcp:close(Socket)
end.
-ifndef(TEST).
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).

View File

@ -89,7 +89,7 @@ feedvar(Path) ->
binary_to_list(
emqx_placeholder:proc_tmpl(
emqx_placeholder:preproc_tmpl(Path),
#{application_priv => code:priv_dir(emqx_ocpp)}
#{application_priv => code:priv_dir(emqx_gateway_ocpp)}
)
).

View File

@ -16,36 +16,119 @@
-module(emqx_ocpp_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
-compile(export_all).
-compile(nowarn_export_all).
init_per_suite(Conf) ->
emqx_ct_helpers:start_apps([emqx_gateway_ocpp], fun set_special_cfg/1),
Conf.
-import(
emqx_gateway_test_utils,
[
assert_fields_exist/2,
request/2,
request/3
]
).
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_gateway_ocpp]).
-define(HEARTBEAT, <<$\n>>).
set_special_cfg(emqx) ->
application:set_env(emqx, allow_anonymous, true),
application:set_env(emqx, enable_acl_cache, false),
LoadedPluginPath = filename:join(["test", "emqx_SUITE_data", "loaded_plugins"]),
application:set_env(
emqx,
plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, LoadedPluginPath)
);
set_special_cfg(_App) ->
ok.
-define(CONF_DEFAULT, <<
"\n"
"gateway.ocpp {\n"
" mountpoint = \"ocpp/\"\n"
" default_heartbeat_interval = \"60s\"\n"
" heartbeat_checking_times_backoff = 1\n"
" message_format_checking = disable\n"
" upstream {\n"
" topic = \"cp/${clientid}\"\n"
" reply_topic = \"cp/${clientid}/Reply\"\n"
" error_topic = \"cp/${clientid}/Reply\"\n"
" }\n"
" dnstream {\n"
" topic = \"cs/${clientid}\"\n"
" }\n"
" listeners.ws.default {\n"
" bind = \"0.0.0.0:33033\"\n"
" websocket.path = \"/ocpp\"\n"
" }\n"
"}\n"
>>).
all() -> emqx_common_test_helpers:all(?MODULE).
%%--------------------------------------------------------------------
%% Testcases
%%---------------------------------------------------------------------
%% setups
%%--------------------------------------------------------------------
init_per_suite(Config) ->
application:load(emqx_gateway_ocpp),
Apps = emqx_cth_suite:start(
[
{emqx_conf, ?CONF_DEFAULT},
emqx_gateway,
emqx_auth,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_common_test_http:create_default_app(),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_common_test_http:delete_default_app(),
emqx_cth_suite:stop(?config(suite_apps, Config)),
ok.
default_config() ->
?CONF_DEFAULT.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_update_listeners(_Config) ->
{200, [DefaultListener]} = request(get, "/gateways/ocpp/listeners"),
ListenerConfKeys =
[
id,
type,
name,
enable,
enable_authn,
bind,
acceptors,
max_connections,
max_conn_rate,
proxy_protocol,
proxy_protocol_timeout,
websocket,
tcp_options
],
StatusKeys = [status, node_status],
assert_fields_exist(ListenerConfKeys ++ StatusKeys, DefaultListener),
?assertMatch(
#{
id := <<"ocpp:ws:default">>,
type := <<"ws">>,
name := <<"default">>,
enable := true,
enable_authn := true,
bind := <<"0.0.0.0:33033">>,
websocket := #{path := <<"/ocpp">>}
},
DefaultListener
),
UpdateBody = emqx_utils_maps:deep_put(
[websocket, path],
maps:with(ListenerConfKeys, DefaultListener),
<<"/ocpp2">>
),
{200, _} = request(put, "/gateways/ocpp/listeners/ocpp:ws:default", UpdateBody),
{200, [UpdatedListener]} = request(get, "/gateways/ocpp/listeners"),
?assertMatch(#{websocket := #{path := <<"/ocpp2">>}}, UpdatedListener).