diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index ae74a614b..03f7ca6a2 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -74,7 +74,8 @@ to_map/1, to_log_map/1, to_list/1, - from_map/1 + from_map/1, + estimate_size/1 ]). -export_type([message_map/0]). @@ -175,6 +176,18 @@ make(MsgId, From, QoS, Topic, Payload, Flags, Headers) when timestamp = Now }. +%% optimistic esitmation of a message size after serialization +%% not including MQTT v5 message headers/user properties etc. +-spec estimate_size(emqx_types:message()) -> non_neg_integer(). +estimate_size(#message{topic = Topic, payload = Payload}) -> + FixedHeaderSize = 1, + VarLenSize = 4, + TopicSize = iolist_size(Topic), + PayloadSize = iolist_size(Payload), + PacketIdSize = 2, + TopicLengthSize = 2, + FixedHeaderSize + VarLenSize + TopicLengthSize + TopicSize + PacketIdSize + PayloadSize. + -spec id(emqx_types:message()) -> maybe(binary()). id(#message{id = Id}) -> Id. diff --git a/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf new file mode 100644 index 000000000..2a7c9def8 --- /dev/null +++ b/apps/emqx_management/i18n/emqx_mgmt_api_publish_i18n.conf @@ -0,0 +1,127 @@ + +emqx_mgmt_api_publish { + publish_api { + desc { + en: """ +Publish one message.
+Possible HTTP status response codes are:
+200: The message is delivered to at least one subscriber;
+202: No matched subscribers;
+400: Message is invalid. for example bad topic name, or QoS is out of range;
+503: Failed to deliver the message to subscriber(s);
+""" + zh: """ +发布一个消息。
+可能的 HTTP 状态码如下:
+200: 消息被成功发送到至少一个订阅。
+202: 没有匹配到任何订阅。
+400: 消息编码错误,如非法主题,或 QoS 超出范围等。
+503: 服务重启等过程中导致转发失败。

+""" + } + } + publish_bulk_api { + desc { + en: """ +Publish a batch of messages.
+Possible HTTP response status code are:
+200: All messages are delivered to at least one subscriber;
+202: At least one message was not delivered to any subscriber;
+400: At least one message is invalid. For example bad topic name, or QoS is out of range;
+503: Failed to deliver at least one of the messages;
+ +In case there is at lest one invalid message in the batch, the HTTP response body +is the same as for /publish API.
+Otherwise the HTTP response body is an array of JSON objects indicating the publish +result of each individual message in the batch. +""" + zh: """ +批量发布一组消息。
+可能的 HTTP 状态码如下:
+200: 所有的消息都被成功发送到至少一个订阅。
+202: 至少有一个消息没有匹配到任何订阅。
+400: 至少有一个消息编码错误,如非法主题,或 QoS 超出范围等。
+503: 至少有一个小因为服务重启的原因导致转发失败。
+ +请求的 Body 或者 Body 中包含的某个消息无法通过 API 规范的类型检查时,HTTP 响应的消息与发布单个消息的 API + /publish 是一样的。 +如果所有的消息都是合法的,那么 HTTP 返回的内容是一个 JSON 数组,每个元素代表了该消息转发的状态。 + +""" + } + } + + topic_name { + desc { + en: "Topic Name" + zh: "主题名称" + } + } + qos { + desc { + en: "MQTT message QoS" + zh: "MQTT 消息的 QoS" + } + } + clientid { + desc { + en: "Each message can be published as if it is done on behalf of an MQTT client whos ID can be specified in this field." + zh: "每个消息都可以带上一个 MQTT 客户端 ID,用于模拟 MQTT 客户端的发布行为。" + } + } + payload { + desc { + en: "The MQTT message payload." + zh: "MQTT 消息体。" + } + } + retain { + desc { + en: "A boolean field to indicate if this message should be retained." + zh: "布尔型字段,用于表示该消息是否保留消息。" + } + } + payload_encoding { + desc { + en: "MQTT Payload Encoding, base64 or plain. When set to base64, the message is decoded before it is published." + zh: "MQTT 消息体的编码方式,可以是 base64plain。当设置为 base64 时,消息在发布前会先被解码。" + } + } + message_id { + desc { + en: "A globally unique message ID for correlation/tracing." + zh: "全局唯一的一个消息 ID,方便用于关联和追踪。" + } + } + reason_code { + desc { + en: """ +The MQTT reason code, as the same ones used in PUBACK packet.
+Currently supported codes are:
+ +16(0x10): No matching subscribers;
+131(0x81): Error happened when dispatching the message. e.g. during EMQX restart;
+144(0x90): Topic name invalid;
+151(0x97): Publish rate limited, or message size exceeded limit. The global size limit can be configured with mqtt.max_packet_size
+NOTE: The message size is estimated with the received topic and payload size, meaning the actual size of serialized bytes (when sent to MQTT subscriber) +might be slightly over the limit. +""" + zh: """ +MQTT 消息发布的错误码,这些错误码也是 MQTT 规范中 PUBACK 消息可能携带的错误码。
+当前支持如下错误码:
+ +16(0x10):没能匹配到任何订阅;
+131(0x81):消息转发时发生错误,例如 EMQX 服务重启;
+144(0x90):主题名称非法;
+151(0x97):受到了速率限制,或者消息尺寸过大。全局消息大小限制可以通过配置项 mqtt.max_packet_size 来进行修改。
+注意:消息尺寸的是通过主题和消息体的字节数进行估算的。具体发布时所占用的字节数可能会稍大于这个估算的值。 +""" + } + } + error_message { + desc { + en: "Describes the failure reason in detail." + zh: "失败的详细原因。" + } + } +} diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 0e05a3875..1678c56e0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -16,7 +16,15 @@ -module(emqx_mgmt_api_publish). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-define(ALL_IS_WELL, 200). +-define(PARTIALLY_OK, 202). +-define(BAD_REQUEST, 400). +-define(DISPATCH_ERROR, 503). -behaviour(minirest_api). @@ -42,11 +50,14 @@ schema("/publish") -> #{ 'operationId' => publish, post => #{ - description => <<"Publish Message">>, + description => ?DESC(publish_api), tags => [<<"Publish">>], 'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, publish_message)), responses => #{ - 200 => hoconsc:mk(hoconsc:ref(?MODULE, publish_message_info)) + ?ALL_IS_WELL => hoconsc:mk(hoconsc:ref(?MODULE, publish_ok)), + ?PARTIALLY_OK => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)), + ?BAD_REQUEST => bad_request_schema(), + ?DISPATCH_ERROR => hoconsc:mk(hoconsc:ref(?MODULE, publish_error)) } } }; @@ -54,44 +65,58 @@ schema("/publish/bulk") -> #{ 'operationId' => publish_batch, post => #{ - description => <<"Publish Messages">>, + description => ?DESC(publish_bulk_api), tags => [<<"Publish">>], 'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message)), #{}), responses => #{ - 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_message_info)), #{}) + ?ALL_IS_WELL => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, publish_ok)), #{}), + ?PARTIALLY_OK => hoconsc:mk( + hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{} + ), + ?BAD_REQUEST => bad_request_schema(), + ?DISPATCH_ERROR => hoconsc:mk( + hoconsc:array(hoconsc:ref(?MODULE, publish_error)), #{} + ) } } }. +bad_request_schema() -> + Union = hoconsc:union([ + hoconsc:ref(?MODULE, bad_request), + hoconsc:array(hoconsc:ref(?MODULE, publish_error)) + ]), + hoconsc:mk(Union, #{}). + fields(message) -> [ {topic, hoconsc:mk(binary(), #{ - desc => <<"Topic Name">>, + desc => ?DESC(topic_name), required => true, example => <<"api/example/topic">> })}, {qos, hoconsc:mk(emqx_schema:qos(), #{ - desc => <<"MQTT QoS">>, + desc => ?DESC(qos), required => false, default => 0 })}, {clientid, hoconsc:mk(binary(), #{ - desc => <<"From client ID">>, + desc => ?DESC(clientid), required => false, example => <<"api_example_client">> })}, {payload, hoconsc:mk(binary(), #{ - desc => <<"MQTT Payload">>, + desc => ?DESC(payload), required => true, example => <<"hello emqx api">> })}, {retain, hoconsc:mk(boolean(), #{ - desc => <<"MQTT Retain Message">>, + desc => ?DESC(retain), required => false, default => false })} @@ -100,53 +125,196 @@ fields(publish_message) -> [ {payload_encoding, hoconsc:mk(hoconsc:enum([plain, base64]), #{ - desc => <<"MQTT Payload Encoding, base64 or plain">>, + desc => ?DESC(payload_encoding), required => false, default => plain })} ] ++ fields(message); -fields(publish_message_info) -> +fields(publish_ok) -> [ {id, hoconsc:mk(binary(), #{ - desc => <<"A globally unique message ID">> + desc => ?DESC(message_id) + })} + ]; +fields(publish_error) -> + [ + {reason_code, + hoconsc:mk(integer(), #{ + desc => ?DESC(reason_code), + example => 16 + })}, + {message, + hoconsc:mk(binary(), #{ + desc => ?DESC(error_message), + example => <<"no_matching_subscribers">> + })} + ]; +fields(bad_request) -> + [ + {code, + hoconsc:mk(string(), #{ + desc => <<"BAD_REQUEST">> + })}, + {message, + hoconsc:mk(binary(), #{ + desc => ?DESC(error_message) })} ]. publish(post, #{body := Body}) -> case message(Body) of {ok, Message} -> - _ = emqx_mgmt:publish(Message), - {200, format_message_response(Message)}; - {error, R} -> - {400, 'BAD_REQUEST', to_binary(R)} + Res = emqx_mgmt:publish(Message), + publish_result_to_http_reply(Message, Res); + {error, Reason} -> + {?BAD_REQUEST, make_bad_req_reply(Reason)} end. publish_batch(post, #{body := Body}) -> case messages(Body) of {ok, Messages} -> - _ = [emqx_mgmt:publish(Message) || Message <- Messages], - {200, format_message_response(Messages)}; - {error, R} -> - {400, 'BAD_REQUEST', to_binary(R)} + ResList = lists:map( + fun(Message) -> + Res = emqx_mgmt:publish(Message), + publish_result_to_http_reply(Message, Res) + end, + Messages + ), + publish_results_to_http_reply(ResList); + {error, Reason} -> + {?BAD_REQUEST, make_bad_req_reply(Reason)} end. +make_bad_req_reply(invalid_topic_name) -> + make_publish_error_response(?RC_TOPIC_NAME_INVALID); +make_bad_req_reply(packet_too_large) -> + %% 0x95 RC_PACKET_TOO_LARGE is not a PUBACK reason code + %% This is why we use RC_QUOTA_EXCEEDED instead + make_publish_error_response(?RC_QUOTA_EXCEEDED, packet_too_large); +make_bad_req_reply(Reason) -> + make_publish_error_response(?RC_IMPLEMENTATION_SPECIFIC_ERROR, to_binary(Reason)). + +-spec is_ok_deliver({_NodeOrShare, _MatchedTopic, emqx_types:deliver_result()}) -> boolean(). +is_ok_deliver({_NodeOrShare, _MatchedTopic, ok}) -> true; +is_ok_deliver({_NodeOrShare, _MatchedTopic, {ok, _}}) -> true; +is_ok_deliver({_NodeOrShare, _MatchedTopic, {error, _}}) -> false. + +%% @hidden Map MQTT publish result reason code to HTTP status code. +%% MQTT reason code | Description | HTTP status code +%% 0 Success 200 +%% 16 No matching subscribers 202 +%% 128 Unspecified error 406 +%% 131 Implementation specific error 406 +%% 144 Topic Name invalid 400 +%% 151 Quota exceeded 400 +%% +%% %%%%%% Below error codes are not implemented so far %%%% +%% +%% If HTTP request passes HTTP authentication, it is considered trusted. +%% In the future, we may choose to check ACL for the provided MQTT Client ID +%% 135 Not authorized 401 +%% +%% %%%%%% Below error codes are not applicable %%%%%%% +%% +%% No user specified packet ID, so there should be no packet ID error +%% 145 Packet identifier is in use 400 +%% +%% No preceding payload format indicator to compare against. +%% Content-Type check should be done at HTTP layer but not here. +%% 153 Payload format invalid 400 +publish_result_to_http_reply(_Message, []) -> + %% matched no subscriber + {?PARTIALLY_OK, make_publish_error_response(?RC_NO_MATCHING_SUBSCRIBERS)}; +publish_result_to_http_reply(Message, PublishResult) -> + case lists:any(fun is_ok_deliver/1, PublishResult) of + true -> + %% delivered to at least one subscriber + OkBody = make_publish_response(Message), + {?ALL_IS_WELL, OkBody}; + false -> + %% this is quite unusual, matched, but failed to deliver + %% if this happens, the publish result log can be helpful + %% to idnetify the reason why publish failed + %% e.g. during emqx restart + ReasonString = <<"failed_to_dispatch">>, + ErrorBody = make_publish_error_response( + ?RC_IMPLEMENTATION_SPECIFIC_ERROR, ReasonString + ), + ?SLOG(warning, #{ + msg => ReasonString, + message_id => emqx_message:id(Message), + results => PublishResult + }), + {?DISPATCH_ERROR, ErrorBody} + end. + +%% @hidden Reply batch publish result. +%% 200 if all published OK. +%% 202 if at least one message matched no subscribers. +%% 503 for temp errors duing EMQX restart +publish_results_to_http_reply([_ | _] = ResList) -> + {Codes0, BodyL} = lists:unzip(ResList), + Codes = lists:usort(Codes0), + HasFailure = lists:member(?DISPATCH_ERROR, Codes), + All200 = (Codes =:= [?ALL_IS_WELL]), + Code = + case All200 of + true -> + %% All OK + ?ALL_IS_WELL; + false when not HasFailure -> + %% Partially OK + ?PARTIALLY_OK; + false -> + %% At least one failed + ?DISPATCH_ERROR + end, + {Code, BodyL}. + message(Map) -> + try + make_message(Map) + catch + throw:Reason -> + {error, Reason} + end. + +make_message(Map) -> Encoding = maps:get(<<"payload_encoding">>, Map, plain), - case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of + case decode_payload(Encoding, maps:get(<<"payload">>, Map)) of {ok, Payload} -> From = maps:get(<<"clientid">>, Map, http_api), QoS = maps:get(<<"qos">>, Map, 0), Topic = maps:get(<<"topic">>, Map), Retain = maps:get(<<"retain">>, Map, false), - {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})}; + try + _ = emqx_topic:validate(name, Topic) + catch + error:_Reason -> + throw(invalid_topic_name) + end, + Message = emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}), + Size = emqx_message:estimate_size(Message), + (Size > size_limit()) andalso throw(packet_too_large), + {ok, Message}; {error, R} -> {error, R} end. -encode_payload(plain, Payload) -> +%% get the global packet size limit since HTTP API does not belong to any zone. +size_limit() -> + try + emqx_config:get([mqtt, max_packet_size]) + catch + _:_ -> + %% leave 1000 bytes for topic name etc. + ?MAX_PACKET_SIZE + end. + +decode_payload(plain, Payload) -> {ok, Payload}; -encode_payload(base64, Payload) -> +decode_payload(base64, Payload) -> try {ok, base64:decode(Payload)} catch @@ -154,6 +322,8 @@ encode_payload(base64, Payload) -> {error, {decode_base64_payload_failed, Payload}} end. +messages([]) -> + {errror, <<"empty_batch">>}; messages(List) -> messages(List, []). @@ -167,10 +337,23 @@ messages([MessageMap | List], Res) -> {error, R} end. -format_message_response(Messages) when is_list(Messages) -> - [format_message_response(Message) || Message <- Messages]; -format_message_response(#message{id = ID}) -> - #{id => emqx_guid:to_hexstr(ID)}. +make_publish_response(#message{id = ID}) -> + #{ + id => emqx_guid:to_hexstr(ID) + }. +make_publish_error_response(ReasonCode) -> + make_publish_error_response(ReasonCode, emqx_reason_codes:name(ReasonCode)). + +make_publish_error_response(ReasonCode, Msg) -> + #{ + reason_code => ReasonCode, + message => to_binary(Msg) + }. + +to_binary(Atom) when is_atom(Atom) -> + atom_to_binary(Atom); +to_binary(Msg) when is_binary(Msg) -> + Msg; to_binary(Term) -> - list_to_binary(io_lib:format("~p", [Term])). + list_to_binary(io_lib:format("~0p", [Term])). diff --git a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl index b4b3aa902..0ebaf7195 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -define(CLIENTID, <<"api_clientid">>). -define(USERNAME, <<"api_username">>). @@ -36,6 +37,16 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite(). +init_per_testcase(Case, Config) -> + ?MODULE:Case({init, Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + +t_publish_api({init, Config}) -> + Config; +t_publish_api({'end', _Config}) -> + ok; t_publish_api(_) -> {ok, Client} = emqtt:start_link(#{ username => <<"api_username">>, clientid => <<"api_clientid">> @@ -48,11 +59,113 @@ t_publish_api(_) -> Auth = emqx_mgmt_api_test_util:auth_header_(), Body = #{topic => ?TOPIC1, payload => Payload}, {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), - ResponseMap = emqx_json:decode(Response, [return_maps]), - ?assertEqual([<<"id">>], maps:keys(ResponseMap)), + ResponseMap = decode_json(Response), + ?assertEqual([<<"id">>], lists:sort(maps:keys(ResponseMap))), ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), - emqtt:disconnect(Client). + emqtt:stop(Client). +t_publish_no_subscriber({init, Config}) -> + Config; +t_publish_no_subscriber({'end', _Config}) -> + ok; +t_publish_no_subscriber(_) -> + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{topic => ?TOPIC1, payload => Payload}, + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ResponseMap = decode_json(Response), + ?assertEqual([<<"message">>, <<"reason_code">>], lists:sort(maps:keys(ResponseMap))), + ?assertMatch(#{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS}, ResponseMap), + ok. + +t_publish_bad_topic({init, Config}) -> + Config; +t_publish_bad_topic({'end', _Config}) -> + ok; +t_publish_bad_topic(_) -> + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{topic => <<"not/a+/valid/topic">>, payload => Payload}, + ?assertMatch( + {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body) + ). + +t_publish_bad_base64({init, Config}) -> + Config; +t_publish_bad_base64({'end', _Config}) -> + ok; +t_publish_bad_base64(_) -> + %% not a valid base64 + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{ + topic => <<"not/a+/valid/topic">>, payload => Payload, payload_encoding => <<"base64">> + }, + ?assertMatch( + {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body) + ). + +t_publish_too_large({init, Config}) -> + MaxPacketSize = 100, + meck:new(emqx_config, [no_link, passthrough, no_history]), + meck:expect(emqx_config, get, fun + ([mqtt, max_packet_size]) -> + MaxPacketSize; + (Other) -> + meck:passthrough(Other) + end), + [{max_packet_size, MaxPacketSize} | Config]; +t_publish_too_large({'end', _Config}) -> + meck:unload(emqx_config), + ok; +t_publish_too_large(Config) -> + MaxPacketSize = proplists:get_value(max_packet_size, Config), + Payload = lists:duplicate(MaxPacketSize, $0), + Path = emqx_mgmt_api_test_util:api_path(["publish"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = #{topic => <<"random/topic">>, payload => Payload}, + {error, {Summary, _Headers, ResponseBody}} = + emqx_mgmt_api_test_util:request_api( + post, + Path, + "", + Auth, + Body, + #{return_body => true} + ), + ?assertMatch({_, 400, _}, Summary), + ?assertMatch( + #{ + <<"reason_code">> := ?RC_QUOTA_EXCEEDED, + <<"message">> := <<"packet_too_large">> + }, + decode_json(ResponseBody) + ), + ok. + +t_publish_bad_topic_bulk({init, Config}) -> + Config; +t_publish_bad_topic_bulk({'end', _Config}) -> + ok; +t_publish_bad_topic_bulk(_Config) -> + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = [ + #{topic => <<"not/a+/valid/topic">>, payload => Payload}, + #{topic => <<"good/topic">>, payload => Payload} + ], + ?assertMatch( + {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body) + ). + +t_publish_bulk_api({init, Config}) -> + Config; +t_publish_bulk_api({'end', _Config}) -> + ok; t_publish_bulk_api(_) -> {ok, Client} = emqtt:start_link(#{ username => <<"api_username">>, clientid => <<"api_clientid">> @@ -63,19 +176,135 @@ t_publish_bulk_api(_) -> Payload = <<"hello">>, Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), Auth = emqx_mgmt_api_test_util:auth_header_(), - Body = [#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}], + Body = [ + #{ + topic => ?TOPIC1, + payload => Payload, + payload_encoding => plain + }, + #{ + topic => ?TOPIC2, + payload => base64:encode(Payload), + payload_encoding => base64 + } + ], {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), - ResponseList = emqx_json:decode(Response, [return_maps]), + ResponseList = decode_json(Response), ?assertEqual(2, erlang:length(ResponseList)), lists:foreach( fun(ResponseMap) -> - ?assertEqual([<<"id">>], maps:keys(ResponseMap)) + ?assertMatch( + [<<"id">>], lists:sort(maps:keys(ResponseMap)) + ) end, ResponseList ), ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)), - emqtt:disconnect(Client). + emqtt:stop(Client). + +t_publish_no_subscriber_bulk({init, Config}) -> + Config; +t_publish_no_subscriber_bulk({'end', _Config}) -> + ok; +t_publish_no_subscriber_bulk(_) -> + {ok, Client} = emqtt:start_link(#{ + username => <<"api_username">>, clientid => <<"api_clientid">> + }), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = [ + #{topic => ?TOPIC1, payload => Payload}, + #{topic => ?TOPIC2, payload => Payload}, + #{topic => <<"no/subscrbier/topic">>, payload => Payload} + ], + {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), + ResponseList = decode_json(Response), + ?assertMatch( + [ + #{<<"id">> := _}, + #{<<"id">> := _}, + #{<<"message">> := <<"no_matching_subscribers">>} + ], + ResponseList + ), + ?assertEqual(ok, receive_assert(?TOPIC1, 0, Payload)), + ?assertEqual(ok, receive_assert(?TOPIC2, 0, Payload)), + emqtt:stop(Client). + +t_publish_bulk_dispatch_one_message_invalid_topic({init, Config}) -> + Config; +t_publish_bulk_dispatch_one_message_invalid_topic({'end', _Config}) -> + ok; +t_publish_bulk_dispatch_one_message_invalid_topic(Config) when is_list(Config) -> + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = [ + #{topic => ?TOPIC1, payload => Payload}, + #{topic => ?TOPIC2, payload => Payload}, + #{topic => <<"bad/#/topic">>, payload => Payload} + ], + {error, {Summary, _Headers, ResponseBody}} = + emqx_mgmt_api_test_util:request_api( + post, + Path, + "", + Auth, + Body, + #{return_body => true} + ), + ?assertMatch({_, 400, _}, Summary), + ?assertMatch( + #{<<"reason_code">> := ?RC_TOPIC_NAME_INVALID}, + decode_json(ResponseBody) + ). + +t_publish_bulk_dispatch_failure({init, Config}) -> + meck:new(emqx, [no_link, passthrough, no_history]), + meck:expect(emqx, is_running, fun() -> false end), + Config; +t_publish_bulk_dispatch_failure({'end', _Config}) -> + meck:unload(emqx), + ok; +t_publish_bulk_dispatch_failure(Config) when is_list(Config) -> + {ok, Client} = emqtt:start_link(#{ + username => <<"api_username">>, clientid => <<"api_clientid">> + }), + {ok, _} = emqtt:connect(Client), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), + Payload = <<"hello">>, + Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), + Auth = emqx_mgmt_api_test_util:auth_header_(), + Body = [ + #{topic => ?TOPIC1, payload => Payload}, + #{topic => ?TOPIC2, payload => Payload}, + #{topic => <<"no/subscrbier/topic">>, payload => Payload} + ], + {error, {Summary, _Headers, ResponseBody}} = + emqx_mgmt_api_test_util:request_api( + post, + Path, + "", + Auth, + Body, + #{return_body => true} + ), + ?assertMatch({_, 503, _}, Summary), + ?assertMatch( + [ + #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, + #{<<"reason_code">> := ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, + #{<<"reason_code">> := ?RC_NO_MATCHING_SUBSCRIBERS} + ], + decode_json(ResponseBody) + ), + emqtt:stop(Client). receive_assert(Topic, Qos, Payload) -> receive @@ -90,3 +319,6 @@ receive_assert(Topic, Qos, Payload) -> after 5000 -> timeout end. + +decode_json(In) -> + emqx_json:decode(In, [return_maps]). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index a8b04dc80..1bc29dfee 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -65,8 +65,11 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, []) when "" -> Url; _ -> Url ++ "?" ++ QueryParams end, - do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}); -request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when + do_request_api(Method, {NewUrl, build_http_header(AuthOrHeaders)}, #{}); +request_api(Method, Url, QueryParams, AuthOrHeaders, Body) -> + request_api(Method, Url, QueryParams, AuthOrHeaders, Body, #{}). + +request_api(Method, Url, QueryParams, AuthOrHeaders, Body, Opts) when (Method =:= post) orelse (Method =:= patch) orelse (Method =:= put) orelse @@ -79,10 +82,12 @@ request_api(Method, Url, QueryParams, AuthOrHeaders, Body) when end, do_request_api( Method, - {NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)} + {NewUrl, build_http_header(AuthOrHeaders), "application/json", emqx_json:encode(Body)}, + Opts ). -do_request_api(Method, Request) -> +do_request_api(Method, Request, Opts) -> + ReturnBody = maps:get(return_body, Opts, false), ct:pal("Method: ~p, Request: ~p", [Method, Request]), case httpc:request(Method, Request, [], []) of {error, socket_closed_remotely} -> @@ -91,8 +96,9 @@ do_request_api(Method, Request) -> Code >= 200 andalso Code =< 299 -> {ok, Return}; - {ok, {Reason, _, _} = Error} -> - ct:pal("error: ~p~n", [Error]), + {ok, {Reason, Headers, Body}} when ReturnBody -> + {error, {Reason, Headers, Body}}; + {ok, {Reason, _Headers, _Body}} -> {error, Reason} end.