diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index aedb4b0fa..121cb4064 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -197,6 +197,10 @@ subscriptions(get, #{ case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of {error, not_found} -> return_http_error(404, "client process not found"); + {error, ignored} -> + return_http_error( + 400, "get subscriptions failed: unsupported" + ); {error, Reason} -> return_http_error(400, Reason); {ok, Subs} -> @@ -222,7 +226,13 @@ subscriptions(post, #{ ) of {error, not_found} -> - return_http_error(404, "client process not found"); + return_http_error( + 404, "client process not found" + ); + {error, ignored} -> + return_http_error( + 400, "subscribe failed: unsupported" + ); {error, Reason} -> return_http_error(400, Reason); {ok, {NTopic, NSubOpts}} -> @@ -241,8 +251,14 @@ subscriptions(delete, #{ with_gateway(Name0, fun(GwName, _) -> case lookup_topic(GwName, ClientId, Topic) of {ok, _} -> - _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), - {204}; + case emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic) of + {error, ignored} -> + return_http_error( + 400, "unsubscribe failed: unsupported" + ); + _ -> + {204} + end; {error, not_found} -> return_http_error(404, "Resource not found") end diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 677176acc..802bbb689 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -378,6 +378,8 @@ client_call(GwName, ClientId, Req) -> of undefined -> {error, not_found}; + ignored -> + {error, ignored}; Res -> Res catch diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl index 0b9f864a3..3f5fe8f70 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl @@ -186,10 +186,10 @@ info(timers, #channel{timers = Timers}) -> -spec stats(channel()) -> emqx_types:stats(). stats(#channel{mqueue = MQueue}) -> - %% XXX: + %% XXX: A fake stats for managed by emqx_management SessionStats = [ - {subscriptions_cnt, 0}, - {subscriptions_max, 0}, + {subscriptions_cnt, 1}, + {subscriptions_max, 1}, {inflight_cnt, 0}, {inflight_max, 0}, {mqueue_len, queue:len(MQueue)}, @@ -524,9 +524,13 @@ handle_out(Type, Data, Channel) -> %%-------------------------------------------------------------------- apply_frame(Frames, Channel) when is_list(Frames) -> - {Outgoings, NChannel} = lists:foldl(fun apply_frame/2, {[], Channel}, Frames), + {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames), {lists:reverse(Outgoings), NChannel}; -apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> +apply_frame(Frames, Channel) -> + ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}), + Channel. + +do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> case maps:get(<<"status">>, Payload) of <<"Accepted">> -> Intv = maps:get(<<"interval">>, Payload), @@ -535,8 +539,9 @@ apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> _ -> {Outgoings, Channel} end; -apply_frame(_, Channel) -> - Channel. +do_apply_frame(Frame, Acc = {_Outgoings, Channel}) -> + ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}), + Acc. %%-------------------------------------------------------------------- %% Handle call diff --git a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl index f1198fe1f..29f08f78e 100644 --- a/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl +++ b/apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl @@ -33,27 +33,27 @@ -define(HEARTBEAT, <<$\n>>). --define(CONF_DEFAULT, << - "\n" - "gateway.ocpp {\n" - " mountpoint = \"ocpp/\"\n" - " default_heartbeat_interval = \"60s\"\n" - " heartbeat_checking_times_backoff = 1\n" - " message_format_checking = disable\n" - " upstream {\n" - " topic = \"cp/${clientid}\"\n" - " reply_topic = \"cp/${clientid}/Reply\"\n" - " error_topic = \"cp/${clientid}/Reply\"\n" - " }\n" - " dnstream {\n" - " topic = \"cs/${clientid}\"\n" - " }\n" - " listeners.ws.default {\n" - " bind = \"0.0.0.0:33033\"\n" - " websocket.path = \"/ocpp\"\n" - " }\n" - "}\n" ->>). +%% erlfmt-ignore +-define(CONF_DEFAULT, <<" + gateway.ocpp { + mountpoint = \"ocpp/\" + default_heartbeat_interval = \"60s\" + heartbeat_checking_times_backoff = 1 + message_format_checking = disable + upstream { + topic = \"cp/${clientid}\" + reply_topic = \"cp/${clientid}/Reply\" + error_topic = \"cp/${clientid}/Reply\" + } + dnstream { + topic = \"cs/${clientid}\" + } + listeners.ws.default { + bind = \"0.0.0.0:33033\" + websocket.path = \"/ocpp\" + } + } +">>). all() -> emqx_common_test_helpers:all(?MODULE).