Merge pull request #12203 from JimMoen/EMQX-11610-gw-OCPP-subscription-cnt

EMQX 11610 gw ocpp subscription cnt
This commit is contained in:
JianBo He 2023-12-20 09:31:08 +08:00 committed by GitHub
commit 1e288de1d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 31 deletions

View File

@ -197,6 +197,10 @@ subscriptions(get, #{
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
{error, not_found} -> {error, not_found} ->
return_http_error(404, "client process not found"); return_http_error(404, "client process not found");
{error, ignored} ->
return_http_error(
400, "get subscriptions failed: unsupported"
);
{error, Reason} -> {error, Reason} ->
return_http_error(400, Reason); return_http_error(400, Reason);
{ok, Subs} -> {ok, Subs} ->
@ -222,7 +226,13 @@ subscriptions(post, #{
) )
of of
{error, not_found} -> {error, not_found} ->
return_http_error(404, "client process not found"); return_http_error(
404, "client process not found"
);
{error, ignored} ->
return_http_error(
400, "subscribe failed: unsupported"
);
{error, Reason} -> {error, Reason} ->
return_http_error(400, Reason); return_http_error(400, Reason);
{ok, {NTopic, NSubOpts}} -> {ok, {NTopic, NSubOpts}} ->
@ -241,8 +251,14 @@ subscriptions(delete, #{
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
case lookup_topic(GwName, ClientId, Topic) of case lookup_topic(GwName, ClientId, Topic) of
{ok, _} -> {ok, _} ->
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), case emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic) of
{204}; {error, ignored} ->
return_http_error(
400, "unsubscribe failed: unsupported"
);
_ ->
{204}
end;
{error, not_found} -> {error, not_found} ->
return_http_error(404, "Resource not found") return_http_error(404, "Resource not found")
end end

View File

@ -378,6 +378,8 @@ client_call(GwName, ClientId, Req) ->
of of
undefined -> undefined ->
{error, not_found}; {error, not_found};
ignored ->
{error, ignored};
Res -> Res ->
Res Res
catch catch

View File

@ -186,10 +186,10 @@ info(timers, #channel{timers = Timers}) ->
-spec stats(channel()) -> emqx_types:stats(). -spec stats(channel()) -> emqx_types:stats().
stats(#channel{mqueue = MQueue}) -> stats(#channel{mqueue = MQueue}) ->
%% XXX: %% XXX: A fake stats for managed by emqx_management
SessionStats = [ SessionStats = [
{subscriptions_cnt, 0}, {subscriptions_cnt, 1},
{subscriptions_max, 0}, {subscriptions_max, 1},
{inflight_cnt, 0}, {inflight_cnt, 0},
{inflight_max, 0}, {inflight_max, 0},
{mqueue_len, queue:len(MQueue)}, {mqueue_len, queue:len(MQueue)},
@ -524,9 +524,13 @@ handle_out(Type, Data, Channel) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
apply_frame(Frames, Channel) when is_list(Frames) -> apply_frame(Frames, Channel) when is_list(Frames) ->
{Outgoings, NChannel} = lists:foldl(fun apply_frame/2, {[], Channel}, Frames), {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames),
{lists:reverse(Outgoings), NChannel}; {lists:reverse(Outgoings), NChannel};
apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) -> apply_frame(Frames, Channel) ->
?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}),
Channel.
do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
case maps:get(<<"status">>, Payload) of case maps:get(<<"status">>, Payload) of
<<"Accepted">> -> <<"Accepted">> ->
Intv = maps:get(<<"interval">>, Payload), Intv = maps:get(<<"interval">>, Payload),
@ -535,8 +539,9 @@ apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
_ -> _ ->
{Outgoings, Channel} {Outgoings, Channel}
end; end;
apply_frame(_, Channel) -> do_apply_frame(Frame, Acc = {_Outgoings, Channel}) ->
Channel. ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}),
Acc.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle call %% Handle call

View File

@ -33,27 +33,27 @@
-define(HEARTBEAT, <<$\n>>). -define(HEARTBEAT, <<$\n>>).
-define(CONF_DEFAULT, << %% erlfmt-ignore
"\n" -define(CONF_DEFAULT, <<"
"gateway.ocpp {\n" gateway.ocpp {
" mountpoint = \"ocpp/\"\n" mountpoint = \"ocpp/\"
" default_heartbeat_interval = \"60s\"\n" default_heartbeat_interval = \"60s\"
" heartbeat_checking_times_backoff = 1\n" heartbeat_checking_times_backoff = 1
" message_format_checking = disable\n" message_format_checking = disable
" upstream {\n" upstream {
" topic = \"cp/${clientid}\"\n" topic = \"cp/${clientid}\"
" reply_topic = \"cp/${clientid}/Reply\"\n" reply_topic = \"cp/${clientid}/Reply\"
" error_topic = \"cp/${clientid}/Reply\"\n" error_topic = \"cp/${clientid}/Reply\"
" }\n" }
" dnstream {\n" dnstream {
" topic = \"cs/${clientid}\"\n" topic = \"cs/${clientid}\"
" }\n" }
" listeners.ws.default {\n" listeners.ws.default {
" bind = \"0.0.0.0:33033\"\n" bind = \"0.0.0.0:33033\"
" websocket.path = \"/ocpp\"\n" websocket.path = \"/ocpp\"
" }\n" }
"}\n" }
>>). ">>).
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).