refactor: publish api; add: batch schema util function

This commit is contained in:
DDDHuang 2021-07-14 11:46:04 +08:00 committed by turtleDeng
parent 187d200cb7
commit 1bfa6ead42
3 changed files with 287 additions and 2 deletions

View File

@ -0,0 +1,161 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_publish).
%% API
-include_lib("emqx/include/emqx.hrl").
-behavior(minirest_api).
-export([api_spec/0]).
-export([ publish/2
, publish_batch/2]).
api_spec() ->
{
[publish_api(), publish_batch_api()],
[request_message_schema(), mqtt_message_schema()]
}.
publish_api() ->
MeteData = #{
post => #{
description => "publish",
parameters => [#{
name => message,
in => body,
required => true,
schema => minirest:ref(<<"request_message">>)
}],
responses => #{
<<"200">> => #{
description => <<"publish ok">>,
schema => minirest:ref(<<"message">>)}}}},
{"/publish", MeteData, publish}.
publish_batch_api() ->
MeteData = #{
post => #{
description => "publish",
parameters => [#{
name => message,
in => body,
required => true,
schema =>#{
type => array,
items => minirest:ref(<<"request_message">>)}
}],
responses => #{
<<"200">> => #{
description => <<"publish result">>,
schema => #{
type => array,
items => minirest:ref(<<"message">>)}}}}},
{"/publish_batch", MeteData, publish_batch}.
request_message_schema() ->
{<<"request_message">>, maps:without([<<"id">>], message_def())}.
mqtt_message_schema() ->
{<<"message">>, message_def()}.
message_def() ->
#{
<<"id">> => #{
type => <<"string">>,
description => <<"Message ID">>},
<<"topic">> => #{
type => <<"string">>,
description => <<"Topic">>},
<<"qos">> => #{
type => <<"integer">>,
enum => [0, 1, 2],
description => <<"Qos">>},
<<"payload">> => #{
type => <<"string">>,
description => <<"Topic">>},
<<"from">> => #{
type => <<"string">>,
description => <<"Message from">>},
<<"flag">> => #{
type => <<"object">>,
description => <<"Message flag">>,
properties => #{
<<"sys">> => #{
type => <<"boolean">>,
default => false,
description => <<"System message flag, nullable, default false">>},
<<"dup">> => #{
type => <<"boolean">>,
default => false,
description => <<"Dup message flag, nullable, default false">>},
<<"retain">> => #{
type => <<"boolean">>,
default => false,
description => <<"Retain message flag, nullable, default false">>}}}
}.
publish(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Message = message(emqx_json:decode(Body, [return_maps])),
_ = emqx_mgmt:publish(Message),
{200, emqx_json:encode(format_message(Message))}.
publish_batch(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Messages = messages(emqx_json:decode(Body, [return_maps])),
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
ResponseBody = emqx_json:encode(format_message(Messages)),
{200, ResponseBody}.
message(Map) ->
From = maps:get(<<"from">>, Map, http_api),
QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map),
Payload = maps:get(<<"payload">>, Map),
Flags = flags(Map),
emqx_message:make(From, QoS, Topic, Payload, Flags, #{}).
flags(Map) ->
Flags = maps:get(<<"flags">>, Map, #{}),
Retain = maps:get(<<"retain">>, Flags, false),
Sys = maps:get(<<"sys">>, Flags, false),
Dup = maps:get(<<"dup">>, Flags, false),
#{
retain => Retain,
sys => Sys,
dup => Dup
}.
messages(List) ->
[message(MessageMap) || MessageMap <- List].
format_message(Messages) when is_list(Messages)->
[format_message(Message) || Message <- Messages];
format_message(#message{id = ID, qos = Qos, from = From, topic = Topic, payload = Payload, flags = Flags}) ->
#{
id => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
payload => Payload,
flag => Flags,
from => to_binary(From)
}.
to_binary(Data) when is_binary(Data) ->
Data;
to_binary(Data) ->
list_to_binary(io_lib:format("~p", [Data])).

View File

@ -21,11 +21,13 @@
, kmg/1 , kmg/1
, ntoa/1 , ntoa/1
, merge_maps/2 , merge_maps/2
, not_found_schema/1
, not_found_schema/2
, batch_operation/3 , batch_operation/3
]). ]).
-export([ not_found_schema/1
, not_found_schema/2
, batch_response_schema/1]).
-export([urldecode/1]). -export([urldecode/1]).
-define(KB, 1024). -define(KB, 1024).
@ -80,6 +82,8 @@ merge_maps(Default, New) ->
urldecode(S) -> urldecode(S) ->
emqx_http_lib:uri_decode(S). emqx_http_lib:uri_decode(S).
%%%==============================================================================================
%% schema util
not_found_schema(Description) -> not_found_schema(Description) ->
not_found_schema(Description, ["RESOURCE_NOT_FOUND"]). not_found_schema(Description, ["RESOURCE_NOT_FOUND"]).
@ -96,6 +100,34 @@ not_found_schema(Description, Enum) ->
type => string}}} type => string}}}
}. }.
batch_response_schema(DefName) ->
#{
type => object,
properties => #{
success => #{
type => integer,
description => <<"Success count">>},
failed => #{
type => integer,
description => <<"Failed count">>},
detail => #{
type => array,
description => <<"Failed object & reason">>,
items => #{
type => object,
properties =>
#{
data => minirest:ref(DefName),
reason => #{
type => <<"string">>
}
}
}
}
}
}.
%%%==============================================================================================
batch_operation(Module, Function, ArgsList) -> batch_operation(Module, Function, ArgsList) ->
Failed = batch_operation(Module, Function, ArgsList, []), Failed = batch_operation(Module, Function, ArgsList, []),
Len = erlang:length(Failed), Len = erlang:length(Failed),

View File

@ -0,0 +1,92 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_publish_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
-define(TOPIC1, <<"api_topic1">>).
-define(TOPIC2, <<"api_topic2">>).
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_publish_api(_) ->
{ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body = #{topic => ?TOPIC1, payload => Payload},
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok),
emqtt:disconnect(Client).
t_publish_batch_api(_) ->
{ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),
ResponseMap = emqx_json:decode(Response, [return_maps]),
?assertEqual(2, erlang:length(ResponseMap)),
?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok),
?assertEqual(receive_assert(?TOPIC2, 0, Payload), ok),
emqtt:disconnect(Client).
receive_assert(Topic, Qos, Payload) ->
receive
{publish, Message} ->
ReceiveTopic = maps:get(topic, Message),
ReceiveQos = maps:get(qos, Message),
ReceivePayload = maps:get(payload, Message),
?assertEqual(ReceiveTopic , Topic),
?assertEqual(ReceiveQos , Qos),
?assertEqual(ReceivePayload , Payload),
ok
after 5000 ->
timeout
end.