diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 80782beca..18e6cc93e 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -13,6 +13,7 @@ {emqx_gateway_http,1}. {emqx_license,1}. {emqx_management,1}. +{emqx_management,2}. {emqx_mgmt_api_plugins,1}. {emqx_mgmt_cluster,1}. {emqx_mgmt_trace,1}. diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 9c701cce4..d56a03fb2 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,7 +1,25 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]}, + [{"5.0.0",[ + {load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}, + {add_module,emqx_management_proto_v2}, + {load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_listeners,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_configs,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}], - [{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]}, + [{"5.0.0",[ + {load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}, + {delete_module,emqx_management_proto_v2}, + {load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_listeners,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_api_configs,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]}, + {load_module,emqx_mgmt,brutal_purge,soft_purge,[]} + ]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index be7054df9..cdf3bf504 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -83,7 +83,9 @@ do_subscribe/2, publish/1, unsubscribe/2, - do_unsubscribe/2 + do_unsubscribe/2, + unsubscribe_batch/2, + do_unsubscribe_batch/2 ]). %% Alarms @@ -151,7 +153,7 @@ get_sys_memory() -> end. node_info(Node) -> - wrap_rpc(emqx_management_proto_v1:node_info(Node)). + wrap_rpc(emqx_management_proto_v2:node_info(Node)). stopped_node_info(Node) -> #{name => Node, node_status => 'Stopped'}. @@ -171,7 +173,7 @@ broker_info() -> Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}. broker_info(Node) -> - wrap_rpc(emqx_management_proto_v1:broker_info(Node)). + wrap_rpc(emqx_management_proto_v2:broker_info(Node)). %%-------------------------------------------------------------------- %% Metrics and Stats @@ -355,7 +357,7 @@ do_call_client(ClientId, Req) -> %% @private call_client(Node, ClientId, Req) -> - wrap_rpc(emqx_management_proto_v1:call_client(Node, ClientId, Req)). + wrap_rpc(emqx_management_proto_v2:call_client(Node, ClientId, Req)). %%-------------------------------------------------------------------- %% Subscriptions @@ -374,7 +376,7 @@ do_list_subscriptions() -> end. list_subscriptions(Node) -> - wrap_rpc(emqx_management_proto_v1:list_subscriptions(Node)). + wrap_rpc(emqx_management_proto_v2:list_subscriptions(Node)). list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([ @@ -402,7 +404,7 @@ subscribe(ClientId, TopicTables) -> subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe([Node | Nodes], ClientId, TopicTables) -> - case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of + case wrap_rpc(emqx_management_proto_v2:subscribe(Node, ClientId, TopicTables)) of {error, _} -> subscribe(Nodes, ClientId, TopicTables); {subscribe, Res} -> {subscribe, Res, Node} end; @@ -417,7 +419,6 @@ do_subscribe(ClientId, TopicTables) -> [{_, Pid}] -> Pid ! {subscribe, TopicTables} end. -%%TODO: ??? publish(Msg) -> emqx_metrics:inc_msg(Msg), emqx:publish(Msg). @@ -430,7 +431,7 @@ unsubscribe(ClientId, Topic) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, channel_not_found}. unsubscribe([Node | Nodes], ClientId, Topic) -> - case wrap_rpc(emqx_management_proto_v1:unsubscribe(Node, ClientId, Topic)) of + case wrap_rpc(emqx_management_proto_v2:unsubscribe(Node, ClientId, Topic)) of {error, _} -> unsubscribe(Nodes, ClientId, Topic); Re -> Re end; @@ -445,6 +446,29 @@ do_unsubscribe(ClientId, Topic) -> [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} end. +-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, channel_not_found}. +unsubscribe_batch(ClientId, Topics) -> + unsubscribe_batch(mria_mnesia:running_nodes(), ClientId, Topics). + +-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe_batch, _} | {error, channel_not_found}. +unsubscribe_batch([Node | Nodes], ClientId, Topics) -> + case wrap_rpc(emqx_management_proto_v2:unsubscribe_batch(Node, ClientId, Topics)) of + {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics); + Re -> Re + end; +unsubscribe_batch([], _ClientId, _Topics) -> + {error, channel_not_found}. + +-spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe_batch, _} | {error, _}. +do_unsubscribe_batch(ClientId, Topics) -> + case ets:lookup(emqx_channel, ClientId) of + [] -> {error, channel_not_found}; + [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]} + end. + %%-------------------------------------------------------------------- %% Get Alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index bc27dc777..956aecf79 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -39,8 +39,9 @@ subscriptions/2, authz_cache/2, subscribe/2, - unsubscribe/2, subscribe_batch/2, + unsubscribe/2, + unsubscribe_batch/2, set_keepalive/2 ]). @@ -88,7 +89,9 @@ paths() -> "/clients/:clientid/authorization/cache", "/clients/:clientid/subscriptions", "/clients/:clientid/subscribe", + "/clients/:clientid/subscribe/bulk", "/clients/:clientid/unsubscribe", + "/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/keepalive" ]. @@ -293,6 +296,21 @@ schema("/clients/:clientid/subscribe") -> } } }; +schema("/clients/:clientid/subscribe/bulk") -> + #{ + 'operationId' => subscribe_batch, + post => #{ + description => <<"Subscribe">>, + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], + 'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscribe))), + responses => #{ + 200 => hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">> + ) + } + } + }; schema("/clients/:clientid/unsubscribe") -> #{ 'operationId' => unsubscribe, @@ -308,6 +326,21 @@ schema("/clients/:clientid/unsubscribe") -> } } }; +schema("/clients/:clientid/unsubscribe/bulk") -> + #{ + 'operationId' => unsubscribe_batch, + post => #{ + description => <<"Unsubscribe">>, + parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}], + 'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, unsubscribe))), + responses => #{ + 204 => <<"Unsubscribe OK">>, + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client id not found">> + ) + } + } + }; schema("/clients/:clientid/keepalive") -> #{ 'operationId' => set_keepalive, @@ -543,11 +576,6 @@ subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo), subscribe(Opts#{clientid => ClientID}). -unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> - Topic = maps:get(<<"topic">>, TopicInfo), - unsubscribe(#{clientid => ClientID, topic => Topic}). - -%% TODO: batch subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [ @@ -556,6 +584,14 @@ subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos} ], subscribe_batch(#{clientid => ClientID, topics => Topics}). +unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> + Topic = maps:get(<<"topic">>, TopicInfo), + unsubscribe(#{clientid => ClientID, topic => Topic}). + +unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> + Topics = [Topic || #{<<"topic">> := Topic} <- TopicInfos], + unsubscribe_batch(#{clientid => ClientID, topics => Topics}). + subscriptions(get, #{bindings := #{clientid := ClientID}}) -> case emqx_mgmt:list_client_subscriptions(ClientID) of [] -> @@ -668,9 +704,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; - {ok, Node} -> - Response = Sub#{node => Node}, - {200, Response} + {ok, SubInfo} -> + {200, SubInfo} + end. + +subscribe_batch(#{clientid := ClientID, topics := Topics}) -> + case lookup(#{clientid => ClientID}) of + {200, _} -> + ArgList = [ + [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] + || #{topic := Topic} = Sub <- Topics + ], + {200, emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList)}; + {404, ?CLIENT_ID_NOT_FOUND} -> + {404, ?CLIENT_ID_NOT_FOUND} end. unsubscribe(#{clientid := ClientID, topic := Topic}) -> @@ -681,12 +728,14 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> {204} end. -subscribe_batch(#{clientid := ClientID, topics := Topics}) -> - ArgList = [ - [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] - || #{topic := Topic} = Sub <- Topics - ], - emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). +unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> + case lookup(#{clientid => ClientID}) of + {200, _} -> + _ = emqx_mgmt:unsubscribe_batch(ClientID, Topics), + {204}; + {404, ?CLIENT_ID_NOT_FOUND} -> + {404, ?CLIENT_ID_NOT_FOUND} + end. %%-------------------------------------------------------------------- %% internal function @@ -700,7 +749,7 @@ do_subscribe(ClientID, Topic0, Options) -> {subscribe, Subscriptions, Node} -> case proplists:is_defined(Topic, Subscriptions) of true -> - {ok, Node}; + {ok, Options#{node => Node, clientid => ClientID, topic => Topic}}; false -> {error, unknow_error} end diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 41a042cad..3354d50e6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -305,7 +305,7 @@ configs(get, Params, _Req) -> Node = maps:get(node, Params, node()), case lists:member(Node, mria_mnesia:running_nodes()) andalso - emqx_management_proto_v1:get_full_config(Node) + emqx_management_proto_v2:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index fc01764aa..35fcd3bca 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -445,7 +445,7 @@ list_listeners() -> [list_listeners(Node) || Node <- mria_mnesia:running_nodes()]. list_listeners(Node) -> - wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). + wrap_rpc(emqx_management_proto_v2:list_listeners(Node)). listener_status_by_id(NodeL) -> Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 27280051c..b2b92d389 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -61,7 +61,7 @@ schema("/publish/bulk") -> } }. -fields(publish_message) -> +fields(message) -> [ {topic, hoconsc:mk(binary(), #{ @@ -75,7 +75,7 @@ fields(publish_message) -> required => false, default => 0 })}, - {from, + {clientid, hoconsc:mk(binary(), #{ desc => <<"From client ID">>, required => false, @@ -94,34 +94,76 @@ fields(publish_message) -> default => false })} ]; +fields(publish_message) -> + [ + {payload_encoding, + hoconsc:mk(hoconsc:enum([plain, base64]), #{ + desc => <<"MQTT Payload Encoding, base64 or plain">>, + required => false, + default => plain + })} + ] ++ fields(message); fields(publish_message_info) -> [ {id, hoconsc:mk(binary(), #{ desc => <<"Internal Message ID">> })} - ] ++ fields(publish_message). + ] ++ fields(message). publish(post, #{body := Body}) -> - Message = message(Body), - _ = emqx_mgmt:publish(Message), - {200, format_message(Message)}. + case message(Body) of + {ok, Message} -> + _ = emqx_mgmt:publish(Message), + {200, format_message(Message)}; + {error, R} -> + {400, 'BAD_REQUEST', to_binary(R)} + end. publish_batch(post, #{body := Body}) -> - Messages = messages(Body), - _ = [emqx_mgmt:publish(Message) || Message <- Messages], - {200, format_message(Messages)}. + case messages(Body) of + {ok, Messages} -> + _ = [emqx_mgmt:publish(Message) || Message <- Messages], + {200, format_message(Messages)}; + {error, R} -> + {400, 'BAD_REQUEST', to_binary(R)} + end. message(Map) -> - From = maps:get(<<"from">>, Map, http_api), - QoS = maps:get(<<"qos">>, Map, 0), - Topic = maps:get(<<"topic">>, Map), - Payload = maps:get(<<"payload">>, Map), - Retain = maps:get(<<"retain">>, Map, false), - emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}). + Encoding = maps:get(<<"payload_encoding">>, Map, plain), + case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of + {ok, Payload} -> + From = maps:get(<<"clientid">>, Map, http_api), + QoS = maps:get(<<"qos">>, Map, 0), + Topic = maps:get(<<"topic">>, Map), + Retain = maps:get(<<"retain">>, Map, false), + {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})}; + {error, R} -> + {error, R} + end. + +encode_payload(plain, Payload) -> + {ok, Payload}; +encode_payload(base64, Payload) -> + try + {ok, base64:decode(Payload)} + catch + _:_ -> + {error, {decode_base64_payload_failed, Payload}} + end. messages(List) -> - [message(MessageMap) || MessageMap <- List]. + messages(List, []). + +messages([], Res) -> + {ok, lists:reverse(Res)}; +messages([MessageMap | List], Res) -> + case message(MessageMap) of + {ok, Message} -> + messages(List, [Message | Res]); + {error, R} -> + {error, R} + end. format_message(Messages) when is_list(Messages) -> [format_message(Message) || Message <- Messages]; @@ -134,7 +176,7 @@ format_message(#message{ topic => Topic, payload => Payload, retain => maps:get(retain, Flags, false), - from => to_binary(From) + clientid => to_binary(From) }. to_binary(Data) when is_binary(Data) -> diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 61d12cf69..58fd6e952 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -202,23 +202,29 @@ json_content_schema(Schema, Desc) -> %%%============================================================================================== batch_operation(Module, Function, ArgsList) -> - Failed = batch_operation(Module, Function, ArgsList, []), - Len = erlang:length(Failed), - Success = erlang:length(ArgsList) - Len, - Fun = - fun({Args, Reason}, Detail) -> - [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] - end, - #{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}. + {Succeed, Failed} = batch_operation(Module, Function, ArgsList, {[], []}), + case erlang:length(Failed) of + 0 -> + Succeed; + _FLen -> + Fun = + fun({Args, Reason}, Detail) -> + [ + #{data => Args, reason => list_to_binary(io_lib:format("~p", [Reason]))} + | Detail + ] + end, + #{succeed => Succeed, failed => lists:foldl(Fun, [], Failed)} + end. -batch_operation(_Module, _Function, [], Failed) -> - lists:reverse(Failed); -batch_operation(Module, Function, [Args | ArgsList], Failed) -> +batch_operation(_Module, _Function, [], {Succeed, Failed}) -> + {lists:reverse(Succeed), lists:reverse(Failed)}; +batch_operation(Module, Function, [Args | ArgsList], {Succeed, Failed}) -> case erlang:apply(Module, Function, Args) of - ok -> - batch_operation(Module, Function, ArgsList, Failed); + {ok, Res} -> + batch_operation(Module, Function, ArgsList, {[Res | Succeed], Failed}); {error, Reason} -> - batch_operation(Module, Function, ArgsList, [{Args, Reason} | Failed]) + batch_operation(Module, Function, ArgsList, {Succeed, [{Args, Reason} | Failed]}) end. properties(Props) -> diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v2.erl b/apps/emqx_management/src/proto/emqx_management_proto_v2.erl new file mode 100644 index 000000000..34da390b5 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_management_proto_v2.erl @@ -0,0 +1,80 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_management_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + node_info/1, + broker_info/1, + list_subscriptions/1, + + list_listeners/1, + subscribe/3, + unsubscribe/3, + unsubscribe_batch/3, + + call_client/3, + + get_full_config/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.1". + +-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe_batch(Node, ClientId, Topics) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]). + +-spec node_info(node()) -> map() | {badrpc, _}. +node_info(Node) -> + rpc:call(Node, emqx_mgmt, node_info, []). + +-spec broker_info(node()) -> map() | {badrpc, _}. +broker_info(Node) -> + rpc:call(Node, emqx_mgmt, broker_info, []). + +-spec list_subscriptions(node()) -> [map()] | {badrpc, _}. +list_subscriptions(Node) -> + rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). + +-spec list_listeners(node()) -> map() | {badrpc, _}. +list_listeners(Node) -> + rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). + +-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> + {subscribe, _} | {error, atom()} | {badrpc, _}. +subscribe(Node, ClientId, TopicTables) -> + rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). + +-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> + {unsubscribe, _} | {error, _} | {badrpc, _}. +unsubscribe(Node, ClientId, Topic) -> + rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). + +-spec call_client(node(), emqx_types:clientid(), term()) -> term(). +call_client(Node, ClientId, Req) -> + rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).