Merge pull request #6192 from turtleDeng/publish-api-user-properties
feat(publish-api): Publish api support user-properties parameters
This commit is contained in:
commit
25215244be
|
@ -71,8 +71,8 @@ subscribe(_Bindings, Params) ->
|
||||||
|
|
||||||
publish(_Bindings, Params) ->
|
publish(_Bindings, Params) ->
|
||||||
logger:debug("API publish Params:~p", [Params]),
|
logger:debug("API publish Params:~p", [Params]),
|
||||||
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
|
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
|
||||||
case do_publish(ClientId, Topic, Qos, Retain, Payload) of
|
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
||||||
{ok, MsgIds} ->
|
{ok, MsgIds} ->
|
||||||
case proplists:get_value(<<"return">>, Params, undefined) of
|
case proplists:get_value(<<"return">>, Params, undefined) of
|
||||||
undefined -> minirest:return(ok);
|
undefined -> minirest:return(ok);
|
||||||
|
@ -114,7 +114,8 @@ loop_subscribe([Params | ParamsN], Acc) ->
|
||||||
{_, Code0, _Reason} -> Code0
|
{_, Code0, _Reason} -> Code0
|
||||||
end,
|
end,
|
||||||
Result = #{clientid => ClientId,
|
Result = #{clientid => ClientId,
|
||||||
topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
|
topic => resp_topic(proplists:get_value(<<"topic">>, Params),
|
||||||
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
code => Code},
|
code => Code},
|
||||||
loop_subscribe(ParamsN, [Result | Acc]).
|
loop_subscribe(ParamsN, [Result | Acc]).
|
||||||
|
|
||||||
|
@ -123,12 +124,13 @@ loop_publish(Params) ->
|
||||||
loop_publish([], Result) ->
|
loop_publish([], Result) ->
|
||||||
lists:reverse(Result);
|
lists:reverse(Result);
|
||||||
loop_publish([Params | ParamsN], Acc) ->
|
loop_publish([Params | ParamsN], Acc) ->
|
||||||
{ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params),
|
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
|
||||||
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of
|
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
||||||
{ok, _} -> 0;
|
{ok, _} -> 0;
|
||||||
{_, Code0, _} -> Code0
|
{_, Code0, _} -> Code0
|
||||||
end,
|
end,
|
||||||
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
|
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params),
|
||||||
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
code => Code},
|
code => Code},
|
||||||
loop_publish(ParamsN, [Result | Acc]).
|
loop_publish(ParamsN, [Result | Acc]).
|
||||||
|
|
||||||
|
@ -143,7 +145,8 @@ loop_unsubscribe([Params | ParamsN], Acc) ->
|
||||||
{_, Code0, _} -> Code0
|
{_, Code0, _} -> Code0
|
||||||
end,
|
end,
|
||||||
Result = #{clientid => ClientId,
|
Result = #{clientid => ClientId,
|
||||||
topic => resp_topic(proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
|
topic => resp_topic(proplists:get_value(<<"topic">>, Params),
|
||||||
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
code => Code},
|
code => Code},
|
||||||
loop_unsubscribe(ParamsN, [Result | Acc]).
|
loop_unsubscribe(ParamsN, [Result | Acc]).
|
||||||
|
|
||||||
|
@ -158,14 +161,17 @@ do_subscribe(ClientId, Topics, QoS) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload) when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
|
||||||
|
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
||||||
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
||||||
do_publish(_ClientId, [], _Qos, _Retain, _Payload) ->
|
do_publish(_ClientId, [], _Qos, _Retain, _Payload, _UserProps) ->
|
||||||
{ok, ?ERROR15, bad_topic};
|
{ok, ?ERROR15, bad_topic};
|
||||||
do_publish(ClientId, Topics, Qos, Retain, Payload) ->
|
do_publish(ClientId, Topics, Qos, Retain, Payload, UserProps) ->
|
||||||
MsgIds = lists:map(fun(Topic) ->
|
MsgIds = lists:map(fun(Topic) ->
|
||||||
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
|
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
|
||||||
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}),
|
UserProps1 = #{'User-Property' => UserProps},
|
||||||
|
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain},
|
||||||
|
headers = #{properties => UserProps1}}),
|
||||||
emqx_guid:to_hexstr(Msg#message.id)
|
emqx_guid:to_hexstr(Msg#message.id)
|
||||||
end, Topics),
|
end, Topics),
|
||||||
{ok, MsgIds}.
|
{ok, MsgIds}.
|
||||||
|
@ -185,19 +191,22 @@ do_unsubscribe(ClientId, Topic) ->
|
||||||
|
|
||||||
parse_subscribe_params(Params) ->
|
parse_subscribe_params(Params) ->
|
||||||
ClientId = proplists:get_value(<<"clientid">>, Params),
|
ClientId = proplists:get_value(<<"clientid">>, Params),
|
||||||
Topics = topics(filter, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
|
Topics = topics(filter, proplists:get_value(<<"topic">>, Params),
|
||||||
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
QoS = proplists:get_value(<<"qos">>, Params, 0),
|
QoS = proplists:get_value(<<"qos">>, Params, 0),
|
||||||
{ClientId, Topics, QoS}.
|
{ClientId, Topics, QoS}.
|
||||||
|
|
||||||
parse_publish_params(Params) ->
|
parse_publish_params(Params) ->
|
||||||
Topics = topics(name, proplists:get_value(<<"topic">>, Params), proplists:get_value(<<"topics">>, Params, <<"">>)),
|
Topics = topics(name, proplists:get_value(<<"topic">>, Params),
|
||||||
ClientId = proplists:get_value(<<"clientid">>, Params),
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
Payload = decode_payload(proplists:get_value(<<"payload">>, Params, <<>>),
|
ClientId = proplists:get_value(<<"clientid">>, Params),
|
||||||
proplists:get_value(<<"encoding">>, Params, <<"plain">>)),
|
Payload = decode_payload(proplists:get_value(<<"payload">>, Params, <<>>),
|
||||||
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
proplists:get_value(<<"encoding">>, Params, <<"plain">>)),
|
||||||
Retain = proplists:get_value(<<"retain">>, Params, false),
|
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
||||||
Payload1 = maybe_maps_to_binary(Payload),
|
Retain = proplists:get_value(<<"retain">>, Params, false),
|
||||||
{ClientId, Topics, Qos, Retain, Payload1}.
|
Payload1 = maybe_maps_to_binary(Payload),
|
||||||
|
UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
|
||||||
|
{ClientId, Topics, Qos, Retain, Payload1, UserProps}.
|
||||||
|
|
||||||
parse_unsubscribe_params(Params) ->
|
parse_unsubscribe_params(Params) ->
|
||||||
ClientId = proplists:get_value(<<"clientid">>, Params),
|
ClientId = proplists:get_value(<<"clientid">>, Params),
|
||||||
|
@ -251,3 +260,8 @@ maybe_maps_to_binary(Payload) ->
|
||||||
_C : _E : S ->
|
_C : _E : S ->
|
||||||
error({encode_payload_fail, S})
|
error({encode_payload_fail, S})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
check_user_props(UserProps) when is_list(UserProps) ->
|
||||||
|
UserProps;
|
||||||
|
check_user_props(UserProps) ->
|
||||||
|
error({user_properties_type_error, UserProps}).
|
||||||
|
|
|
@ -401,7 +401,7 @@ t_pubsub(_) ->
|
||||||
|
|
||||||
ClientId = <<"client1">>,
|
ClientId = <<"client1">>,
|
||||||
Options = #{clientid => ClientId,
|
Options = #{clientid => ClientId,
|
||||||
proto_ver => 5},
|
proto_ver => v5},
|
||||||
Topic = <<"mytopic">>,
|
Topic = <<"mytopic">>,
|
||||||
{ok, C1} = emqtt:start_link(Options),
|
{ok, C1} = emqtt:start_link(Options),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
@ -536,11 +536,29 @@ t_pubsub(_) ->
|
||||||
api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3),
|
api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3),
|
||||||
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))),
|
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))),
|
||||||
|
|
||||||
|
{ok, _, [1]} = emqtt:subscribe(C1, <<"mytopic">>, qos1),
|
||||||
|
timer:sleep(50),
|
||||||
|
|
||||||
|
%% user properties
|
||||||
|
{ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
|
||||||
|
#{<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => <<"mytopic">>,
|
||||||
|
<<"qos">> => 1,
|
||||||
|
<<"payload">> => <<"hello world">>,
|
||||||
|
<<"user_properties">> => #{<<"porp_1">> => <<"porp_1">>}}),
|
||||||
|
?assert(receive
|
||||||
|
{publish, #{payload := <<"hello world">>,
|
||||||
|
properties := #{'User-Property' := [{<<"porp_1">>,<<"porp_1">>}]}}} ->
|
||||||
|
true
|
||||||
|
after 100 ->
|
||||||
|
false
|
||||||
|
end),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1),
|
ok = emqtt:disconnect(C1),
|
||||||
|
|
||||||
?assertEqual(3, emqx_metrics:val('messages.qos1.received') - Qos1Received),
|
?assertEqual(4, emqx_metrics:val('messages.qos1.received') - Qos1Received),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
|
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
|
||||||
?assertEqual(5, emqx_metrics:val('messages.received') - Received).
|
?assertEqual(6, emqx_metrics:val('messages.received') - Received).
|
||||||
|
|
||||||
loop([]) -> [];
|
loop([]) -> [];
|
||||||
|
|
||||||
|
|
|
@ -454,7 +454,8 @@ columns_with_exam('message.publish') ->
|
||||||
, {<<"topic">>, <<"t/a">>}
|
, {<<"topic">>, <<"t/a">>}
|
||||||
, {<<"qos">>, 1}
|
, {<<"qos">>, 1}
|
||||||
, {<<"flags">>, #{}}
|
, {<<"flags">>, #{}}
|
||||||
, {<<"headers">>, undefined}
|
, {<<"headers">>, #{<<"properties">> => #{<<"User-Property">> =>
|
||||||
|
#{'prop_key' => <<"prop_val">>}}}}
|
||||||
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
|
||||||
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
||||||
, {<<"node">>, node()}
|
, {<<"node">>, node()}
|
||||||
|
|
|
@ -98,21 +98,8 @@ sql_test_action() ->
|
||||||
fill_default_values(Event, Context) ->
|
fill_default_values(Event, Context) ->
|
||||||
maps:merge(envs_examp(Event), Context).
|
maps:merge(envs_examp(Event), Context).
|
||||||
|
|
||||||
envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) ->
|
envs_examp(EVENT_TOPIC) ->
|
||||||
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
|
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
|
||||||
emqx_rule_maps:atom_key_map(
|
emqx_rule_maps:atom_key_map(
|
||||||
maps:from_list(
|
maps:from_list(
|
||||||
emqx_rule_events:columns_with_exam(EventName)));
|
emqx_rule_events:columns_with_exam(EventName))).
|
||||||
envs_examp(_) ->
|
|
||||||
#{id => emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
||||||
clientid => <<"c_emqx">>,
|
|
||||||
username => <<"u_emqx">>,
|
|
||||||
payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,
|
|
||||||
peerhost => <<"127.0.0.1">>,
|
|
||||||
topic => <<"t/a">>,
|
|
||||||
qos => 1,
|
|
||||||
flags => #{sys => true, event => true},
|
|
||||||
publish_received_at => emqx_rule_utils:now_ms(),
|
|
||||||
timestamp => emqx_rule_utils:now_ms(),
|
|
||||||
node => node()
|
|
||||||
}.
|
|
||||||
|
|
Loading…
Reference in New Issue