feat(publish-api): Publish api supports user-properties parameters

This commit is contained in:
Turtle 2021-11-17 11:17:10 +08:00
parent 8b0478e663
commit ca1ece3db0
2 changed files with 55 additions and 23 deletions

View File

@ -71,8 +71,8 @@ subscribe(_Bindings, Params) ->
publish(_Bindings, Params) ->
logger:debug("API publish Params:~p", [Params]),
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
case do_publish(ClientId, Topic, Qos, Retain, Payload) of
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
{ok, MsgIds} ->
case proplists:get_value(<<"return">>, Params, undefined) of
undefined -> minirest:return(ok);
@ -114,7 +114,8 @@ loop_subscribe([Params | ParamsN], Acc) ->
{_, Code0, _Reason} -> Code0
end,
Result = #{clientid => ClientId,
topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
topic => resp_topic(proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
code => Code},
loop_subscribe(ParamsN, [Result | Acc]).
@ -123,12 +124,13 @@ loop_publish(Params) ->
loop_publish([], Result) ->
lists:reverse(Result);
loop_publish([Params | ParamsN], Acc) ->
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
{ok, _} -> 0;
{_, Code0, _} -> Code0
end,
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
code => Code},
loop_publish(ParamsN, [Result | Acc]).
@ -143,7 +145,8 @@ loop_unsubscribe([Params | ParamsN], Acc) ->
{_, Code0, _} -> Code0
end,
Result = #{clientid => ClientId,
topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
topic => resp_topic(proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
code => Code},
loop_unsubscribe(ParamsN, [Result | Acc]).
@ -158,14 +161,17 @@ do_subscribe(ClientId, Topics, QoS) ->
_ -> ok
end.
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload) when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
{ok, ?ERROR8, <<"bad clientid: must be string">>};
do_publish(_ClientId, [], _Qos, _Retain, _Payload) ->
do_publish(_ClientId, [], _Qos, _Retain, _Payload, _UserProps) ->
{ok, ?ERROR15, bad_topic};
do_publish(ClientId, Topics, Qos, Retain, Payload) ->
do_publish(ClientId, Topics, Qos, Retain, Payload, UserProps) ->
MsgIds = lists:map(fun(Topic) ->
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}),
UserProps1 = #{'User-Property' => UserProps},
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain},
headers = #{properties => UserProps1}}),
emqx_guid:to_hexstr(Msg#message.id)
end, Topics),
{ok, MsgIds}.
@ -185,19 +191,22 @@ do_unsubscribe(ClientId, Topic) ->
parse_subscribe_params(Params) ->
ClientId = proplists:get_value(<<"clientid">>, Params),
Topics = topics(filter, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
Topics = topics(filter, proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
QoS = proplists:get_value(<<"qos">>, Params, 0),
{ClientId, Topics, QoS}.
parse_publish_params(Params) ->
Topics = topics(name, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
Topics = topics(name, proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
ClientId = proplists:get_value(<<"clientid">>, Params),
Payload = decode_payload(proplists:get_value(<<"payload">>, Params, <<>>),
proplists:get_value(<<"encoding">>, Params, <<"plain">>)),
Qos = proplists:get_value(<<"qos">>, Params, 0),
Retain = proplists:get_value(<<"retain">>, Params, false),
Payload1 = maybe_maps_to_binary(Payload),
{ClientId, Topics, Qos, Retain, Payload1}.
UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
{ClientId, Topics, Qos, Retain, Payload1, UserProps}.
parse_unsubscribe_params(Params) ->
ClientId = proplists:get_value(<<"clientid">>, Params),
@ -251,3 +260,8 @@ maybe_maps_to_binary(Payload) ->
_C : _E : S ->
error({encode_payload_fail, S})
end.
check_user_props(UserProps) when is_list(UserProps) ->
UserProps;
check_user_props(UserProps) ->
error({user_properties_type_error, UserProps}).

View File

@ -401,7 +401,7 @@ t_pubsub(_) ->
ClientId = <<"client1">>,
Options = #{clientid => ClientId,
proto_ver => 5},
proto_ver => v5},
Topic = <<"mytopic">>,
{ok, C1} = emqtt:start_link(Options),
{ok, _} = emqtt:connect(C1),
@ -536,11 +536,29 @@ t_pubsub(_) ->
api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3),
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))),
{ok, _, [1]} = emqtt:subscribe(C1, <<"mytopic">>, qos1),
timer:sleep(50),
%% user properties
{ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => <<"mytopic">>,
<<"qos">> => 1,
<<"payload">> => <<"hello world">>,
<<"user_properties">> => #{<<"porp_1">> => <<"porp_1">>}}),
?assert(receive
{publish, #{payload := <<"hello world">>,
properties := #{'User-Property' := [{<<"porp_1">>,<<"porp_1">>}]}}} ->
true
after 100 ->
false
end),
ok = emqtt:disconnect(C1),
?assertEqual(3, emqx_metrics:val('messages.qos1.received') - Qos1Received),
?assertEqual(4, emqx_metrics:val('messages.qos1.received') - Qos1Received),
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
?assertEqual(5, emqx_metrics:val('messages.received') - Received).
?assertEqual(6, emqx_metrics:val('messages.received') - Received).
loop([]) -> [];