diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 731a1807c..3c6634edc 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.31"}, + {vsn, "0.1.32"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]}, diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 22d76fe60..30e9762e4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -247,9 +247,10 @@ page_params(Qs) -> get_cluster_listeners_info(GwName) -> Listeners = emqx_gateway_conf:listeners(GwName), ListenOns = lists:map( - fun(#{id := Id} = Conf) -> + fun(#{id := Id, type := Type0} = Conf) -> + Type = binary_to_existing_atom(Type0), ListenOn = emqx_gateway_conf:get_bind(Conf), - {Id, ListenOn} + {Type, Id, ListenOn} end, Listeners ), @@ -293,17 +294,11 @@ listeners_cluster_status(Listeners) -> do_listeners_cluster_status(Listeners) -> Node = node(), lists:foldl( - fun({Id, ListenOn}, Acc) -> - BinId = erlang:atom_to_binary(Id), - {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), - {Running, Curr} = - try esockd:get_current_connections({Id, ListenOn}) of - Int -> {true, Int} - catch - %% not started - error:not_found -> - {false, 0} - end, + fun({Type, Id, ListenOn}, Acc) -> + {Running, Curr} = current_listener_status(Type, Id, ListenOn), + {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener( + erlang:atom_to_binary(Id) + ), Acc#{ Id => #{ node => Node, @@ -319,6 +314,24 @@ do_listeners_cluster_status(Listeners) -> Listeners ). +current_listener_status(Type, Id, _ListenOn) when Type =:= ws; Type =:= wss -> + Info = ranch:info(Id), + Conns = proplists:get_value(all_connections, Info, 0), + Running = + case proplists:get_value(status, Info) of + running -> true; + _ -> false + end, + {Running, Conns}; +current_listener_status(_Type, Id, ListenOn) -> + try esockd:get_current_connections({Id, ListenOn}) of + Int -> {true, Int} + catch + %% not started + error:not_found -> + {false, 0} + end. + ensure_integer_or_infinity(infinity) -> infinity; ensure_integer_or_infinity(<<"infinity">>) -> @@ -762,9 +775,9 @@ examples_listener() -> <<"tlsv1.1">>, <<"tlsv1">> ], - cacertfile => <<"/etc/emqx/certs/cacert.pem">>, - certfile => <<"/etc/emqx/certs/cert.pem">>, - keyfile => <<"/etc/emqx/certs/key.pem">>, + cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>, + certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>, + keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>, verify => <<"verify_none">>, fail_if_no_peer_cert => false }, @@ -808,9 +821,9 @@ examples_listener() -> dtls_options => #{ versions => [<<"dtlsv1.2">>, <<"dtlsv1">>], - cacertfile => <<"/etc/emqx/certs/cacert.pem">>, - certfile => <<"/etc/emqx/certs/cert.pem">>, - keyfile => <<"/etc/emqx/certs/key.pem">>, + cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>, + certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>, + keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>, verify => <<"verify_none">>, fail_if_no_peer_cert => false }, @@ -835,9 +848,9 @@ examples_listener() -> dtls_options => #{ versions => [<<"dtlsv1.2">>, <<"dtlsv1">>], - cacertfile => <<"/etc/emqx/certs/cacert.pem">>, - certfile => <<"/etc/emqx/certs/cert.pem">>, - keyfile => <<"/etc/emqx/certs/key.pem">>, + cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>, + certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>, + keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>, verify => <<"verify_none">>, user_lookup_fun => <<"emqx_tls_psk:lookup">>, ciphers => @@ -869,5 +882,95 @@ examples_listener() -> user_id_type => <<"username">> } } + }, + ws_listener => + #{ + summary => <<"A simple WebSocket listener example">>, + value => + #{ + name => <<"ws-def">>, + type => <<"ws">>, + bind => <<"33043">>, + acceptors => 16, + max_connections => 1024000, + max_conn_rate => 1000, + websocket => + #{ + path => <<"/ocpp">>, + fail_if_no_subprotocol => true, + supported_subprotocols => <<"ocpp1.6">>, + check_origin_enable => false, + check_origins => + <<"http://localhost:18083, http://127.0.0.1:18083">>, + compress => false, + piggyback => <<"single">> + }, + tcp_options => + #{ + active_n => 100, + backlog => 1024, + send_timeout => <<"15s">>, + send_timeout_close => true, + recbuf => <<"10KB">>, + sndbuf => <<"10KB">>, + buffer => <<"10KB">>, + high_watermark => <<"1MB">>, + nodelay => false, + reuseaddr => true, + keepalive => "none" + } + } + }, + wss_listener => + #{ + summary => <<"A simple WebSocket/TLS listener example">>, + value => + #{ + name => <<"ws-ssl-def">>, + type => <<"wss">>, + bind => <<"33053">>, + acceptors => 16, + max_connections => 1024000, + max_conn_rate => 1000, + websocket => + #{ + path => <<"/ocpp">>, + fail_if_no_subprotocol => true, + supported_subprotocols => <<"ocpp1.6">>, + check_origin_enable => false, + check_origins => + <<"http://localhost:18083, http://127.0.0.1:18083">>, + compress => false, + piggyback => <<"single">> + }, + ssl_options => + #{ + versions => [ + <<"tlsv1.3">>, + <<"tlsv1.2">>, + <<"tlsv1.1">>, + <<"tlsv1">> + ], + cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>, + certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>, + keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>, + verify => <<"verify_none">>, + fail_if_no_peer_cert => false + }, + tcp_options => + #{ + active_n => 100, + backlog => 1024, + send_timeout => <<"15s">>, + send_timeout_close => true, + recbuf => <<"10KB">>, + sndbuf => <<"10KB">>, + buffer => <<"10KB">>, + high_watermark => <<"1MB">>, + nodelay => false, + reuseaddr => true, + keepalive => "none" + } + } } }. diff --git a/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl b/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl index dc779dc76..805c5f6f4 100644 --- a/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl +++ b/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl @@ -86,10 +86,9 @@ -define(IS_ERROR(F), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}). -define(IS_ERROR(F, Id), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR, id := Id}). --define(IS_BootNotification_RESP(Payload), #{ +-define(IS_BootNotification_RESP(Status, Interval), #{ type := ?OCPP_MSG_TYPE_ID_CALLRESULT, - action := ?OCPP_ACT_BootNotification, - payload := Payload + payload := #{<<"status">> := Status, <<"interval">> := Interval} }). -define(ERR_FRAME(Id, Code, Desc), #{ diff --git a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src index c7981a033..8682c164c 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src +++ b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_ocpp, [ {description, "OCPP-J 1.6 Gateway for EMQX"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index cb8ec7e91..d20b35d04 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -527,20 +527,19 @@ apply_frame(Frames, Channel) when is_list(Frames) -> {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames), {lists:reverse(Outgoings), NChannel}; apply_frame(Frames, Channel) -> - ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}), + ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}), Channel. -do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> - case maps:get(<<"status">>, Payload) of +do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) -> + case Status of <<"Accepted">> -> - Intv = maps:get(<<"interval">>, Payload), - ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Intv}), - {[{event, updated} | Outgoings], reset_keepalive(Intv, Channel)}; + ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}), + {[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)}; _ -> {Outgoings, Channel} end; -do_apply_frame(Frame, Acc = {_Outgoings, Channel}) -> - ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}), +do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) -> + ?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}), Acc. %%-------------------------------------------------------------------- @@ -762,19 +761,15 @@ payload2frame(#{ action => Action, payload => Payload }; -payload2frame( - MqttPayload = - #{ - <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT, - <<"UniqueId">> := Id, - <<"Payload">> := Payload - } -) -> - Action = maps:get(<<"Action">>, MqttPayload, undefined), +payload2frame(#{ + <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT, + <<"UniqueId">> := Id, + <<"Payload">> := Payload +}) -> #{ type => ?OCPP_MSG_TYPE_ID_CALLRESULT, id => Id, - action => Action, + action => undefined, payload => Payload }; payload2frame(#{ diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index 6d00726cf..e63f8891d 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -16,6 +16,7 @@ -module(emqx_ocpp_SUITE). +-include("emqx_ocpp.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -145,3 +146,136 @@ t_enable_disable_gw_ocpp(_Config) -> AssertEnabled(false), ?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/true", <<>>)), AssertEnabled(true). + +t_adjust_keepalive_timer(_Config) -> + {ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>), + UniqueId = <<"3335862321">>, + BootNotification = #{ + id => UniqueId, + type => ?OCPP_MSG_TYPE_ID_CALL, + action => <<"BootNotification">>, + payload => #{ + <<"chargePointVendor">> => <<"vendor1">>, + <<"chargePointModel">> => <<"model1">> + } + }, + ok = send_msg(ClientPid, BootNotification), + %% check the default keepalive timer + timer:sleep(1000), + ?assertMatch( + #{conninfo := #{keepalive := 60}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>) + ), + %% publish the BootNotification.ack + AckPayload = emqx_utils_json:encode(#{ + <<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT, + <<"UniqueId">> => UniqueId, + <<"Payload">> => #{ + <<"currentTime">> => "2023-06-21T14:20:39+00:00", + <<"interval">> => 300, + <<"status">> => <<"Accepted">> + } + }), + _ = emqx:publish(emqx_message:make(<<"ocpp/cs/client1">>, AckPayload)), + {ok, _Resp} = receive_msg(ClientPid), + %% assert: check the keepalive timer is adjusted + ?assertMatch( + #{conninfo := #{keepalive := 300}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>) + ), + %% close conns + close(ClientPid), + timer:sleep(1000), + %% assert: + ?assertEqual(undefined, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)), + ok. + +t_listeners_status(_Config) -> + {200, [Listener]} = request(get, "/gateways/ocpp/listeners"), + ?assertMatch( + #{ + status := #{running := true, current_connections := 0} + }, + Listener + ), + %% add a connection + {ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>), + UniqueId = <<"3335862321">>, + BootNotification = #{ + id => UniqueId, + type => ?OCPP_MSG_TYPE_ID_CALL, + action => <<"BootNotification">>, + payload => #{ + <<"chargePointVendor">> => <<"vendor1">>, + <<"chargePointModel">> => <<"model1">> + } + }, + ok = send_msg(ClientPid, BootNotification), + timer:sleep(1000), + %% assert: the current_connections is 1 + {200, [Listener1]} = request(get, "/gateways/ocpp/listeners"), + ?assertMatch( + #{ + status := #{running := true, current_connections := 1} + }, + Listener1 + ), + %% close conns + close(ClientPid), + timer:sleep(1000), + %% assert: the current_connections is 0 + {200, [Listener2]} = request(get, "/gateways/ocpp/listeners"), + ?assertMatch( + #{ + status := #{running := true, current_connections := 0} + }, + Listener2 + ). + +%%-------------------------------------------------------------------- +%% ocpp simple client + +connect(Host, Port, ClientId) -> + Timeout = 5000, + ConnOpts = #{connect_timeout => 5000}, + case gun:open(Host, Port, ConnOpts) of + {ok, ConnPid} -> + {ok, _} = gun:await_up(ConnPid, Timeout), + case upgrade(ConnPid, ClientId, Timeout) of + {ok, _Headers} -> {ok, ConnPid}; + Error -> Error + end; + Error -> + Error + end. + +upgrade(ConnPid, ClientId, Timeout) -> + Path = binary_to_list(<<"/ocpp/", ClientId/binary>>), + WsHeaders = [{<<"cache-control">>, <<"no-cache">>}], + StreamRef = gun:ws_upgrade(ConnPid, Path, WsHeaders, #{protocols => [{<<"ocpp1.6">>, gun_ws_h}]}), + receive + {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], Headers} -> + {ok, Headers}; + {gun_response, ConnPid, _, _, Status, Headers} -> + {error, {ws_upgrade_failed, Status, Headers}}; + {gun_error, ConnPid, StreamRef, Reason} -> + {error, {ws_upgrade_failed, Reason}} + after Timeout -> + {error, timeout} + end. + +send_msg(ConnPid, Frame) when is_map(Frame) -> + Opts = emqx_ocpp_frame:serialize_opts(), + Msg = emqx_ocpp_frame:serialize_pkt(Frame, Opts), + gun:ws_send(ConnPid, {text, Msg}). + +receive_msg(ConnPid) -> + receive + {gun_ws, ConnPid, _Ref, {_Type, Msg}} -> + ParseState = emqx_ocpp_frame:initial_parse_state(#{}), + {ok, Frame, _Rest, _NewParseStaet} = emqx_ocpp_frame:parse(Msg, ParseState), + {ok, Frame} + after 5000 -> + {error, timeout} + end. + +close(ConnPid) -> + gun:shutdown(ConnPid). diff --git a/changes/ee/fix-12892.md b/changes/ee/fix-12892.md new file mode 100644 index 000000000..45fd5c825 --- /dev/null +++ b/changes/ee/fix-12892.md @@ -0,0 +1,3 @@ +Fix an error in OCPP gateway's handling of downstream BootNotification. + +Fix the `gateways/ocpp/listeners` endpoint to return the correct number of current connections.