Merge remote-tracking branch 'ce/main-v4.3' into main-v4.4-merged-main-v4.3

This commit is contained in:
JianBo He 2022-01-12 13:59:07 +08:00
commit 492f4dceff
6 changed files with 28 additions and 17 deletions

1
.gitignore vendored
View File

@ -59,3 +59,4 @@ erlang_ls.config
*#
# For direnv
.envrc
mix.lock

View File

@ -62,8 +62,8 @@
-export([ list_subscriptions/1
, list_subscriptions_via_topic/2
, list_subscriptions_via_topic/3
, lookup_subscriptions/1
, lookup_subscriptions/2
, lookup_subscriptions/3
]).
%% Routes
@ -331,18 +331,20 @@ list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]).
lookup_subscriptions(ClientId, FormatFun) ->
lists:append([lookup_subscriptions(Node, ClientId, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
case ets:lookup(emqx_subid, ClientId) of
[] -> [];
[{_, Pid}] ->
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
end;
lookup_subscriptions(Node, ClientId, {M, F}) when Node =:= node() ->
Result = case ets:lookup(emqx_subid, ClientId) of
[] -> [];
[{_, Pid}] ->
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
end,
%% format at the called node
erlang:apply(M, F, [Result]);
lookup_subscriptions(Node, ClientId) ->
rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
lookup_subscriptions(Node, ClientId, FormatFun) ->
rpc_call(Node, lookup_subscriptions, [Node, ClientId, FormatFun]).
%%--------------------------------------------------------------------
%% Routes

View File

@ -85,10 +85,10 @@ list(#{node := Node} = Bindings, Params) ->
end.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
lookup(#{clientid := ClientId}, _Params) ->
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
minirest:return({ok, emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId), ?format_fun)}).
format(Items) when is_list(Items) ->
[format(Item) || Item <- Items];

View File

@ -1,6 +1,6 @@
{application, emqx_stomp,
[{description, "EMQ X Stomp Protocol Plugin"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_stomp_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,16 +1,24 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp},
{apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
{<<".*">>,[]}],
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},

View File

@ -121,7 +121,7 @@ g(Key, Opts, Val) ->
parse(<<>>, Parser) ->
{more, Parser};
parse(Bytes, #{phase := body, len := Len, state := State}) ->
parse(Bytes, #{phase := body, length := Len, state := State}) ->
parse(body, Bytes, State, Len);
parse(Bytes, Parser = #{pre := Pre}) ->