From 4f95f097f7db6be21f56bb5bc237392ebfa815f4 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Jun 2022 17:48:07 +0800 Subject: [PATCH] fix: publish api rename param from - clientid --- .../src/emqx_mgmt_api_publish.erl | 74 ++++++++++++++----- 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 27280051c..cd73e4f3e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -61,7 +61,7 @@ schema("/publish/bulk") -> } }. -fields(publish_message) -> +fields(message) -> [ {topic, hoconsc:mk(binary(), #{ @@ -75,7 +75,7 @@ fields(publish_message) -> required => false, default => 0 })}, - {from, + {clientid, hoconsc:mk(binary(), #{ desc => <<"From client ID">>, required => false, @@ -94,34 +94,74 @@ fields(publish_message) -> default => false })} ]; +fields(publish_message) -> + [ + {payload_encoding, + hoconsc:mk(hoconsc:enum([plain, base64]), #{ + desc => <<"MQTT Payload Encoding, base64 or plain">>, + required => false, + default => plain + })} + ] ++ fields(message); fields(publish_message_info) -> [ {id, hoconsc:mk(binary(), #{ desc => <<"Internal Message ID">> })} - ] ++ fields(publish_message). + ] ++ fields(message). publish(post, #{body := Body}) -> - Message = message(Body), - _ = emqx_mgmt:publish(Message), - {200, format_message(Message)}. + case message(Body) of + {ok, Message} -> + _ = emqx_mgmt:publish(Message), + {200, format_message(Message)}; + {error, R} -> + {400, 'BAD_REQUEST', to_binary(R)} + end. publish_batch(post, #{body := Body}) -> - Messages = messages(Body), - _ = [emqx_mgmt:publish(Message) || Message <- Messages], - {200, format_message(Messages)}. + case messages(Body) of + {ok, Messages} -> + _ = [emqx_mgmt:publish(Message) || Message <- Messages], + {200, format_message(Messages)}; + {error, R} -> + {400, 'BAD_REQUEST', to_binary(R)} + end. message(Map) -> - From = maps:get(<<"from">>, Map, http_api), - QoS = maps:get(<<"qos">>, Map, 0), - Topic = maps:get(<<"topic">>, Map), - Payload = maps:get(<<"payload">>, Map), - Retain = maps:get(<<"retain">>, Map, false), - emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}). + Encoding = maps:get(<<"payload_encoding">>, Map, plain), + case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of + {ok, Payload} -> + From = maps:get(<<"clientid">>, Map, http_api), + QoS = maps:get(<<"qos">>, Map, 0), + Topic = maps:get(<<"topic">>, Map), + Retain = maps:get(<<"retain">>, Map, false), + {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})}; + {error, R} -> + {error, R} + end. + +encode_payload(plain, Payload) -> {ok, Payload}; +encode_payload(base64, Payload) -> + try + {ok, base64:decode(Payload)} + catch _:_ -> + {error, {decode_base64_payload_failed, Payload}} + end. messages(List) -> - [message(MessageMap) || MessageMap <- List]. + messages(List, []). + +messages([], Res) -> + {ok, lists:reverse(Res)}; +messages([MessageMap | List], Res) -> + case message(MessageMap) of + {ok, Message} -> + messages(List, [Message | Res]); + {error, R} -> + {error, R} + end. format_message(Messages) when is_list(Messages) -> [format_message(Message) || Message <- Messages]; @@ -134,7 +174,7 @@ format_message(#message{ topic => Topic, payload => Payload, retain => maps:get(retain, Flags, false), - from => to_binary(From) + clientid => to_binary(From) }. to_binary(Data) when is_binary(Data) ->