fix(ocpp): avoid an error log in handling dnsteeam messages
This commit is contained in:
parent
1dfd9115cd
commit
18196ec19c
|
@ -86,10 +86,9 @@
|
|||
-define(IS_ERROR(F), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}).
|
||||
-define(IS_ERROR(F, Id), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR, id := Id}).
|
||||
|
||||
-define(IS_BootNotification_RESP(Payload), #{
|
||||
-define(IS_BootNotification_RESP(Status, Interval), #{
|
||||
type := ?OCPP_MSG_TYPE_ID_CALLRESULT,
|
||||
action := ?OCPP_ACT_BootNotification,
|
||||
payload := Payload
|
||||
payload := #{<<"status">> := Status, <<"interval">> := Interval}
|
||||
}).
|
||||
|
||||
-define(ERR_FRAME(Id, Code, Desc), #{
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_gateway_ocpp, [
|
||||
{description, "OCPP-J 1.6 Gateway for EMQX"},
|
||||
{vsn, "0.1.3"},
|
||||
{vsn, "0.1.4"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, jesse, emqx, emqx_gateway]},
|
||||
{env, []},
|
||||
|
|
|
@ -527,20 +527,19 @@ apply_frame(Frames, Channel) when is_list(Frames) ->
|
|||
{Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames),
|
||||
{lists:reverse(Outgoings), NChannel};
|
||||
apply_frame(Frames, Channel) ->
|
||||
?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}),
|
||||
?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}),
|
||||
Channel.
|
||||
|
||||
do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
|
||||
case maps:get(<<"status">>, Payload) of
|
||||
do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) ->
|
||||
case Status of
|
||||
<<"Accepted">> ->
|
||||
Intv = maps:get(<<"interval">>, Payload),
|
||||
?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Intv}),
|
||||
{[{event, updated} | Outgoings], reset_keepalive(Intv, Channel)};
|
||||
?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}),
|
||||
{[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)};
|
||||
_ ->
|
||||
{Outgoings, Channel}
|
||||
end;
|
||||
do_apply_frame(Frame, Acc = {_Outgoings, Channel}) ->
|
||||
?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}),
|
||||
do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) ->
|
||||
?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}),
|
||||
Acc.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -762,19 +761,15 @@ payload2frame(#{
|
|||
action => Action,
|
||||
payload => Payload
|
||||
};
|
||||
payload2frame(
|
||||
MqttPayload =
|
||||
#{
|
||||
<<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT,
|
||||
<<"UniqueId">> := Id,
|
||||
<<"Payload">> := Payload
|
||||
}
|
||||
) ->
|
||||
Action = maps:get(<<"Action">>, MqttPayload, undefined),
|
||||
payload2frame(#{
|
||||
<<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT,
|
||||
<<"UniqueId">> := Id,
|
||||
<<"Payload">> := Payload
|
||||
}) ->
|
||||
#{
|
||||
type => ?OCPP_MSG_TYPE_ID_CALLRESULT,
|
||||
id => Id,
|
||||
action => Action,
|
||||
action => undefined,
|
||||
payload => Payload
|
||||
};
|
||||
payload2frame(#{
|
||||
|
|
|
@ -237,6 +237,7 @@ do_init(Req, Opts, WsOpts) ->
|
|||
)
|
||||
of
|
||||
{error, Reason, _State} ->
|
||||
1 = Reason,
|
||||
{ok, cowboy_req:reply(400, #{}, to_bin(Reason), Req), WsOpts};
|
||||
{ok, [Resp, Opts, WsOpts], NState} ->
|
||||
{cowboy_websocket, Resp, [Req, Opts, NState], WsOpts}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
-module(emqx_ocpp_SUITE).
|
||||
|
||||
-include("emqx_ocpp.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
|
@ -145,3 +146,86 @@ t_enable_disable_gw_ocpp(_Config) ->
|
|||
AssertEnabled(false),
|
||||
?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/true", <<>>)),
|
||||
AssertEnabled(true).
|
||||
|
||||
t_adjust_keepalive_timer(_Config) ->
|
||||
{ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>),
|
||||
UniqueId = <<"3335862321">>,
|
||||
BootNotification = #{
|
||||
id => UniqueId,
|
||||
type => ?OCPP_MSG_TYPE_ID_CALL,
|
||||
action => <<"BootNotification">>,
|
||||
payload => #{
|
||||
<<"chargePointVendor">> => <<"vendor1">>,
|
||||
<<"chargePointModel">> => <<"model1">>
|
||||
}
|
||||
},
|
||||
ok = send_msg(ClientPid, BootNotification),
|
||||
%% check the default keepalive timer
|
||||
timer:sleep(1000),
|
||||
?assertMatch(
|
||||
#{conninfo := #{keepalive := 60}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)
|
||||
),
|
||||
%% publish the BootNotification.ack
|
||||
AckPayload = emqx_utils_json:encode(#{
|
||||
<<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT,
|
||||
<<"UniqueId">> => UniqueId,
|
||||
<<"Payload">> => #{
|
||||
<<"currentTime">> => "2023-06-21T14:20:39+00:00",
|
||||
<<"interval">> => 300,
|
||||
<<"status">> => <<"Accepted">>
|
||||
}
|
||||
}),
|
||||
_ = emqx:publish(emqx_message:make(<<"ocpp/cs/client1">>, AckPayload)),
|
||||
{ok, _Resp} = receive_msg(ClientPid),
|
||||
%% assert: check the keepalive timer is adjusted
|
||||
?assertMatch(
|
||||
#{conninfo := #{keepalive := 300}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)
|
||||
),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% ocpp simple client
|
||||
|
||||
connect(Host, Port, ClientId) ->
|
||||
Timeout = 5000,
|
||||
ConnOpts = #{connect_timeout => 5000},
|
||||
case gun:open(Host, Port, ConnOpts) of
|
||||
{ok, ConnPid} ->
|
||||
{ok, _} = gun:await_up(ConnPid, Timeout),
|
||||
case upgrade(ConnPid, ClientId, Timeout) of
|
||||
{ok, _Headers} -> {ok, ConnPid};
|
||||
Error -> Error
|
||||
end;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
upgrade(ConnPid, ClientId, Timeout) ->
|
||||
Path = binary_to_list(<<"/ocpp/", ClientId/binary>>),
|
||||
WsHeaders = [{<<"cache-control">>, <<"no-cache">>}],
|
||||
StreamRef = gun:ws_upgrade(ConnPid, Path, WsHeaders, #{protocols => [{<<"ocpp1.6">>, gun_ws_h}]}),
|
||||
receive
|
||||
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], Headers} ->
|
||||
{ok, Headers};
|
||||
{gun_response, ConnPid, _, _, Status, Headers} ->
|
||||
{error, {ws_upgrade_failed, Status, Headers}};
|
||||
{gun_error, ConnPid, StreamRef, Reason} ->
|
||||
{error, {ws_upgrade_failed, Reason}}
|
||||
after Timeout ->
|
||||
{error, timeout}
|
||||
end.
|
||||
|
||||
send_msg(ConnPid, Frame) when is_map(Frame) ->
|
||||
Opts = emqx_ocpp_frame:serialize_opts(),
|
||||
Msg = emqx_ocpp_frame:serialize_pkt(Frame, Opts),
|
||||
gun:ws_send(ConnPid, {text, Msg}).
|
||||
|
||||
receive_msg(ConnPid) ->
|
||||
receive
|
||||
{gun_ws, ConnPid, _Ref, {_Type, Msg}} ->
|
||||
ParseState = emqx_ocpp_frame:initial_parse_state(#{}),
|
||||
{ok, Frame, _Rest, _NewParseStaet} = emqx_ocpp_frame:parse(Msg, ParseState),
|
||||
{ok, Frame}
|
||||
after 5000 ->
|
||||
{error, timeout}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue