diff --git a/.gitignore b/.gitignore index bbb4edebb..13217dc00 100644 --- a/.gitignore +++ b/.gitignore @@ -59,3 +59,4 @@ erlang_ls.config *# # For direnv .envrc +mix.lock diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 8f871ecfc..c4e5323f2 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -62,8 +62,8 @@ -export([ list_subscriptions/1 , list_subscriptions_via_topic/2 , list_subscriptions_via_topic/3 - , lookup_subscriptions/1 , lookup_subscriptions/2 + , lookup_subscriptions/3 ]). %% Routes @@ -331,18 +331,20 @@ list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> list_subscriptions_via_topic(Node, Topic, FormatFun) -> rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). -lookup_subscriptions(ClientId) -> - lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]). +lookup_subscriptions(ClientId, FormatFun) -> + lists:append([lookup_subscriptions(Node, ClientId, FormatFun) || Node <- ekka_mnesia:running_nodes()]). -lookup_subscriptions(Node, ClientId) when Node =:= node() -> - case ets:lookup(emqx_subid, ClientId) of - [] -> []; - [{_, Pid}] -> - ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) - end; +lookup_subscriptions(Node, ClientId, {M, F}) when Node =:= node() -> + Result = case ets:lookup(emqx_subid, ClientId) of + [] -> []; + [{_, Pid}] -> + ets:match_object(emqx_suboption, {{Pid, '_'}, '_'}) + end, + %% format at the called node + erlang:apply(M, F, [Result]); -lookup_subscriptions(Node, ClientId) -> - rpc_call(Node, lookup_subscriptions, [Node, ClientId]). +lookup_subscriptions(Node, ClientId, FormatFun) -> + rpc_call(Node, lookup_subscriptions, [Node, ClientId, FormatFun]). %%-------------------------------------------------------------------- %% Routes diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 4165ca51a..464df33c9 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -85,10 +85,10 @@ list(#{node := Node} = Bindings, Params) -> end. lookup(#{node := Node, clientid := ClientId}, _Params) -> - minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))}); + minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)}); lookup(#{clientid := ClientId}, _Params) -> - minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}). + minirest:return({ok, emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId), ?format_fun)}). format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src index c2f4b57d3..a70826e34 100644 --- a/apps/emqx_stomp/src/emqx_stomp.app.src +++ b/apps/emqx_stomp/src/emqx_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_stomp, [{description, "EMQ X Stomp Protocol Plugin"}, - {vsn, "4.3.3"}, % strict semver, bump manually! + {vsn, "4.3.4"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_stomp_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index 0b5372dc9..8d61073d0 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,16 +1,24 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + [{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]}, + {"4.3.2", + [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}, {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], - [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + [{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]}, + {"4.3.2", + [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, diff --git a/apps/emqx_stomp/src/emqx_stomp_frame.erl b/apps/emqx_stomp/src/emqx_stomp_frame.erl index c4d19ae3a..212242969 100644 --- a/apps/emqx_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_stomp/src/emqx_stomp_frame.erl @@ -121,7 +121,7 @@ g(Key, Opts, Val) -> parse(<<>>, Parser) -> {more, Parser}; -parse(Bytes, #{phase := body, len := Len, state := State}) -> +parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); parse(Bytes, Parser = #{pre := Pre}) ->