From 1bfa6ead4218324f76d139956233540fb5877665 Mon Sep 17 00:00:00 2001 From: DDDHuang <904897578@qq.com> Date: Wed, 14 Jul 2021 11:46:04 +0800 Subject: [PATCH] refactor: publish api; add: batch schema util function --- .../src/emqx_mgmt_api_publish.erl | 161 ++++++++++++++++++ apps/emqx_management/src/emqx_mgmt_util.erl | 36 +++- .../test/emqx_mgmt_publish_api_SUITE.erl | 92 ++++++++++ 3 files changed, 287 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_api_publish.erl create mode 100644 apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl new file mode 100644 index 000000000..a3305bc55 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_publish). +%% API +-include_lib("emqx/include/emqx.hrl"). + +-behavior(minirest_api). + +-export([api_spec/0]). + +-export([ publish/2 + , publish_batch/2]). + +api_spec() -> + { + [publish_api(), publish_batch_api()], + [request_message_schema(), mqtt_message_schema()] + }. + +publish_api() -> + MeteData = #{ + post => #{ + description => "publish", + parameters => [#{ + name => message, + in => body, + required => true, + schema => minirest:ref(<<"request_message">>) + }], + responses => #{ + <<"200">> => #{ + description => <<"publish ok">>, + schema => minirest:ref(<<"message">>)}}}}, + {"/publish", MeteData, publish}. + +publish_batch_api() -> + MeteData = #{ + post => #{ + description => "publish", + parameters => [#{ + name => message, + in => body, + required => true, + schema =>#{ + type => array, + items => minirest:ref(<<"request_message">>)} + }], + responses => #{ + <<"200">> => #{ + description => <<"publish result">>, + schema => #{ + type => array, + items => minirest:ref(<<"message">>)}}}}}, + {"/publish_batch", MeteData, publish_batch}. + +request_message_schema() -> + {<<"request_message">>, maps:without([<<"id">>], message_def())}. + +mqtt_message_schema() -> + {<<"message">>, message_def()}. + +message_def() -> + #{ + <<"id">> => #{ + type => <<"string">>, + description => <<"Message ID">>}, + <<"topic">> => #{ + type => <<"string">>, + description => <<"Topic">>}, + <<"qos">> => #{ + type => <<"integer">>, + enum => [0, 1, 2], + description => <<"Qos">>}, + <<"payload">> => #{ + type => <<"string">>, + description => <<"Topic">>}, + <<"from">> => #{ + type => <<"string">>, + description => <<"Message from">>}, + <<"flag">> => #{ + type => <<"object">>, + description => <<"Message flag">>, + properties => #{ + <<"sys">> => #{ + type => <<"boolean">>, + default => false, + description => <<"System message flag, nullable, default false">>}, + <<"dup">> => #{ + type => <<"boolean">>, + default => false, + description => <<"Dup message flag, nullable, default false">>}, + <<"retain">> => #{ + type => <<"boolean">>, + default => false, + description => <<"Retain message flag, nullable, default false">>}}} + }. + +publish(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Message = message(emqx_json:decode(Body, [return_maps])), + _ = emqx_mgmt:publish(Message), + {200, emqx_json:encode(format_message(Message))}. + +publish_batch(post, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Messages = messages(emqx_json:decode(Body, [return_maps])), + _ = [emqx_mgmt:publish(Message) || Message <- Messages], + ResponseBody = emqx_json:encode(format_message(Messages)), + {200, ResponseBody}. + +message(Map) -> + From = maps:get(<<"from">>, Map, http_api), + QoS = maps:get(<<"qos">>, Map, 0), + Topic = maps:get(<<"topic">>, Map), + Payload = maps:get(<<"payload">>, Map), + Flags = flags(Map), + emqx_message:make(From, QoS, Topic, Payload, Flags, #{}). + +flags(Map) -> + Flags = maps:get(<<"flags">>, Map, #{}), + Retain = maps:get(<<"retain">>, Flags, false), + Sys = maps:get(<<"sys">>, Flags, false), + Dup = maps:get(<<"dup">>, Flags, false), + #{ + retain => Retain, + sys => Sys, + dup => Dup + }. + +messages(List) -> + [message(MessageMap) || MessageMap <- List]. + +format_message(Messages) when is_list(Messages)-> + [format_message(Message) || Message <- Messages]; +format_message(#message{id = ID, qos = Qos, from = From, topic = Topic, payload = Payload, flags = Flags}) -> + #{ + id => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + payload => Payload, + flag => Flags, + from => to_binary(From) + }. + +to_binary(Data) when is_binary(Data) -> + Data; +to_binary(Data) -> + list_to_binary(io_lib:format("~p", [Data])). diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index 4197973e7..6ba7cb93a 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -21,11 +21,13 @@ , kmg/1 , ntoa/1 , merge_maps/2 - , not_found_schema/1 - , not_found_schema/2 , batch_operation/3 ]). +-export([ not_found_schema/1 + , not_found_schema/2 + , batch_response_schema/1]). + -export([urldecode/1]). -define(KB, 1024). @@ -80,6 +82,8 @@ merge_maps(Default, New) -> urldecode(S) -> emqx_http_lib:uri_decode(S). +%%%============================================================================================== +%% schema util not_found_schema(Description) -> not_found_schema(Description, ["RESOURCE_NOT_FOUND"]). @@ -96,6 +100,34 @@ not_found_schema(Description, Enum) -> type => string}}} }. +batch_response_schema(DefName) -> + #{ + type => object, + properties => #{ + success => #{ + type => integer, + description => <<"Success count">>}, + failed => #{ + type => integer, + description => <<"Failed count">>}, + detail => #{ + type => array, + description => <<"Failed object & reason">>, + items => #{ + type => object, + properties => + #{ + data => minirest:ref(DefName), + reason => #{ + type => <<"string">> + } + } + } + } + } + }. + +%%%============================================================================================== batch_operation(Module, Function, ArgsList) -> Failed = batch_operation(Module, Function, ArgsList, []), Len = erlang:length(Failed), diff --git a/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl new file mode 100644 index 000000000..9ecb1a11b --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_publish_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(CLIENTID, <<"api_clientid">>). +-define(USERNAME, <<"api_username">>). + +-define(TOPIC1, <<"api_topic1">>). +-define(TOPIC2, <<"api_topic2">>). + + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_publish_api(_) -> + {ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{topic => ?TOPIC1, payload => Payload}, + {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok), + emqtt:disconnect(Client). + +t_publish_batch_api(_) -> + {ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}], + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ResponseMap = emqx_json:decode(Response, [return_maps]), + ?assertEqual(2, erlang:length(ResponseMap)), + ?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok), + ?assertEqual(receive_assert(?TOPIC2, 0, Payload), ok), + emqtt:disconnect(Client). + +receive_assert(Topic, Qos, Payload) -> + receive + {publish, Message} -> + ReceiveTopic = maps:get(topic, Message), + ReceiveQos = maps:get(qos, Message), + ReceivePayload = maps:get(payload, Message), + ?assertEqual(ReceiveTopic , Topic), + ?assertEqual(ReceiveQos , Qos), + ?assertEqual(ReceivePayload , Payload), + ok + after 5000 -> + timeout + end. +