From 18196ec19ce39139e00d7f73f6d5c77f1c94d74c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 17 Apr 2024 17:48:19 +0800 Subject: [PATCH] fix(ocpp): avoid an error log in handling dnsteeam messages --- apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl | 5 +- .../src/emqx_gateway_ocpp.app.src | 2 +- .../src/emqx_ocpp_channel.erl | 31 +++---- .../src/emqx_ocpp_connection.erl | 1 + .../test/emqx_ocpp_SUITE.erl | 84 +++++++++++++++++++ 5 files changed, 101 insertions(+), 22 deletions(-) diff --git a/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl b/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl index dc779dc76..805c5f6f4 100644 --- a/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl +++ b/apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl @@ -86,10 +86,9 @@ -define(IS_ERROR(F), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}). -define(IS_ERROR(F, Id), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR, id := Id}). --define(IS_BootNotification_RESP(Payload), #{ +-define(IS_BootNotification_RESP(Status, Interval), #{ type := ?OCPP_MSG_TYPE_ID_CALLRESULT, - action := ?OCPP_ACT_BootNotification, - payload := Payload + payload := #{<<"status">> := Status, <<"interval">> := Interval} }). -define(ERR_FRAME(Id, Code, Desc), #{ diff --git a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src index c7981a033..8682c164c 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src +++ b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_ocpp, [ {description, "OCPP-J 1.6 Gateway for EMQX"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index cb8ec7e91..d20b35d04 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -527,20 +527,19 @@ apply_frame(Frames, Channel) when is_list(Frames) -> {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames), {lists:reverse(Outgoings), NChannel}; apply_frame(Frames, Channel) -> - ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}), + ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}), Channel. -do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> - case maps:get(<<"status">>, Payload) of +do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) -> + case Status of <<"Accepted">> -> - Intv = maps:get(<<"interval">>, Payload), - ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Intv}), - {[{event, updated} | Outgoings], reset_keepalive(Intv, Channel)}; + ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}), + {[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)}; _ -> {Outgoings, Channel} end; -do_apply_frame(Frame, Acc = {_Outgoings, Channel}) -> - ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}), +do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) -> + ?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}), Acc. %%-------------------------------------------------------------------- @@ -762,19 +761,15 @@ payload2frame(#{ action => Action, payload => Payload }; -payload2frame( - MqttPayload = - #{ - <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT, - <<"UniqueId">> := Id, - <<"Payload">> := Payload - } -) -> - Action = maps:get(<<"Action">>, MqttPayload, undefined), +payload2frame(#{ + <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT, + <<"UniqueId">> := Id, + <<"Payload">> := Payload +}) -> #{ type => ?OCPP_MSG_TYPE_ID_CALLRESULT, id => Id, - action => Action, + action => undefined, payload => Payload }; payload2frame(#{ diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl index 0932314fe..331b9d323 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl @@ -237,6 +237,7 @@ do_init(Req, Opts, WsOpts) -> ) of {error, Reason, _State} -> + 1 = Reason, {ok, cowboy_req:reply(400, #{}, to_bin(Reason), Req), WsOpts}; {ok, [Resp, Opts, WsOpts], NState} -> {cowboy_websocket, Resp, [Req, Opts, NState], WsOpts} diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index 6d00726cf..b72eb9e1d 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -16,6 +16,7 @@ -module(emqx_ocpp_SUITE). +-include("emqx_ocpp.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -145,3 +146,86 @@ t_enable_disable_gw_ocpp(_Config) -> AssertEnabled(false), ?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/true", <<>>)), AssertEnabled(true). + +t_adjust_keepalive_timer(_Config) -> + {ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>), + UniqueId = <<"3335862321">>, + BootNotification = #{ + id => UniqueId, + type => ?OCPP_MSG_TYPE_ID_CALL, + action => <<"BootNotification">>, + payload => #{ + <<"chargePointVendor">> => <<"vendor1">>, + <<"chargePointModel">> => <<"model1">> + } + }, + ok = send_msg(ClientPid, BootNotification), + %% check the default keepalive timer + timer:sleep(1000), + ?assertMatch( + #{conninfo := #{keepalive := 60}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>) + ), + %% publish the BootNotification.ack + AckPayload = emqx_utils_json:encode(#{ + <<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT, + <<"UniqueId">> => UniqueId, + <<"Payload">> => #{ + <<"currentTime">> => "2023-06-21T14:20:39+00:00", + <<"interval">> => 300, + <<"status">> => <<"Accepted">> + } + }), + _ = emqx:publish(emqx_message:make(<<"ocpp/cs/client1">>, AckPayload)), + {ok, _Resp} = receive_msg(ClientPid), + %% assert: check the keepalive timer is adjusted + ?assertMatch( + #{conninfo := #{keepalive := 300}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>) + ), + ok. + +%%-------------------------------------------------------------------- +%% ocpp simple client + +connect(Host, Port, ClientId) -> + Timeout = 5000, + ConnOpts = #{connect_timeout => 5000}, + case gun:open(Host, Port, ConnOpts) of + {ok, ConnPid} -> + {ok, _} = gun:await_up(ConnPid, Timeout), + case upgrade(ConnPid, ClientId, Timeout) of + {ok, _Headers} -> {ok, ConnPid}; + Error -> Error + end; + Error -> + Error + end. + +upgrade(ConnPid, ClientId, Timeout) -> + Path = binary_to_list(<<"/ocpp/", ClientId/binary>>), + WsHeaders = [{<<"cache-control">>, <<"no-cache">>}], + StreamRef = gun:ws_upgrade(ConnPid, Path, WsHeaders, #{protocols => [{<<"ocpp1.6">>, gun_ws_h}]}), + receive + {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], Headers} -> + {ok, Headers}; + {gun_response, ConnPid, _, _, Status, Headers} -> + {error, {ws_upgrade_failed, Status, Headers}}; + {gun_error, ConnPid, StreamRef, Reason} -> + {error, {ws_upgrade_failed, Reason}} + after Timeout -> + {error, timeout} + end. + +send_msg(ConnPid, Frame) when is_map(Frame) -> + Opts = emqx_ocpp_frame:serialize_opts(), + Msg = emqx_ocpp_frame:serialize_pkt(Frame, Opts), + gun:ws_send(ConnPid, {text, Msg}). + +receive_msg(ConnPid) -> + receive + {gun_ws, ConnPid, _Ref, {_Type, Msg}} -> + ParseState = emqx_ocpp_frame:initial_parse_state(#{}), + {ok, Frame, _Rest, _NewParseStaet} = emqx_ocpp_frame:parse(Msg, ParseState), + {ok, Frame} + after 5000 -> + {error, timeout} + end.