feat: mqtt/publish support to publish with properties and user_properties
This commit is contained in:
parent
3bf08960da
commit
fd11e02639
|
@ -1,5 +1,11 @@
|
||||||
# EMQX 4.4 Changes
|
# EMQX 4.4 Changes
|
||||||
|
|
||||||
|
## v4.4.5
|
||||||
|
|
||||||
|
### Enhancements (synced from v4.3.16)
|
||||||
|
* HTTP API `mqtt/publish` support to publish with properties and user_properties.
|
||||||
|
|
||||||
|
|
||||||
## v4.4.4
|
## v4.4.4
|
||||||
|
|
||||||
### Enhancements (synced from v4.3.15)
|
### Enhancements (synced from v4.3.15)
|
||||||
|
|
|
@ -74,7 +74,9 @@ publish(_Bindings, Params) ->
|
||||||
try parse_publish_params(Params) of
|
try parse_publish_params(Params) of
|
||||||
Result -> do_publish(Params, Result)
|
Result -> do_publish(Params, Result)
|
||||||
catch
|
catch
|
||||||
_E : _R -> minirest:return({ok, ?ERROR8, bad_params})
|
_E : _R ->
|
||||||
|
logger:debug("API publish result:~p ~p", [_E, _R]),
|
||||||
|
minirest:return({ok, ?ERROR8, bad_params})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unsubscribe(_Bindings, Params) ->
|
unsubscribe(_Bindings, Params) ->
|
||||||
|
@ -159,8 +161,8 @@ do_subscribe(ClientId, Topics, QoS) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) ->
|
do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, Props}) ->
|
||||||
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
case do_publish(ClientId, Topic, Qos, Retain, Payload, Props) of
|
||||||
{ok, MsgIds} ->
|
{ok, MsgIds} ->
|
||||||
case proplists:get_value(<<"return">>, Params, undefined) of
|
case proplists:get_value(<<"return">>, Params, undefined) of
|
||||||
undefined -> minirest:return(ok);
|
undefined -> minirest:return(ok);
|
||||||
|
@ -174,17 +176,16 @@ do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) ->
|
||||||
minirest:return(Result)
|
minirest:return(Result)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
|
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _Props)
|
||||||
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
||||||
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
||||||
do_publish(_ClientId, [], _Qos, _Retain, _Payload, _UserProps) ->
|
do_publish(_ClientId, [], _Qos, _Retain, _Payload, _Props) ->
|
||||||
{ok, ?ERROR15, bad_topic};
|
{ok, ?ERROR15, bad_topic};
|
||||||
do_publish(ClientId, Topics, Qos, Retain, Payload, UserProps) ->
|
do_publish(ClientId, Topics, Qos, Retain, Payload, Props) ->
|
||||||
MsgIds = lists:map(fun(Topic) ->
|
MsgIds = lists:map(fun(Topic) ->
|
||||||
Msg = emqx_message:make(ClientId, Qos, Topic, Payload),
|
Msg = emqx_message:make(ClientId, Qos, Topic, Payload,
|
||||||
UserProps1 = #{'User-Property' => UserProps},
|
#{retain => Retain}, Props),
|
||||||
_ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain},
|
_ = emqx_mgmt:publish(Msg),
|
||||||
headers = #{properties => UserProps1}}),
|
|
||||||
emqx_guid:to_hexstr(Msg#message.id)
|
emqx_guid:to_hexstr(Msg#message.id)
|
||||||
end, Topics),
|
end, Topics),
|
||||||
{ok, MsgIds}.
|
{ok, MsgIds}.
|
||||||
|
@ -218,8 +219,8 @@ parse_publish_params(Params) ->
|
||||||
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
||||||
Retain = proplists:get_value(<<"retain">>, Params, false),
|
Retain = proplists:get_value(<<"retain">>, Params, false),
|
||||||
Payload1 = maybe_maps_to_binary(Payload),
|
Payload1 = maybe_maps_to_binary(Payload),
|
||||||
UserProps = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
|
Props = parse_props(Params),
|
||||||
{ClientId, Topics, Qos, Retain, Payload1, UserProps}.
|
{ClientId, Topics, Qos, Retain, Payload1, Props}.
|
||||||
|
|
||||||
parse_unsubscribe_params(Params) ->
|
parse_unsubscribe_params(Params) ->
|
||||||
ClientId = proplists:get_value(<<"clientid">>, Params),
|
ClientId = proplists:get_value(<<"clientid">>, Params),
|
||||||
|
@ -274,16 +275,42 @@ maybe_maps_to_binary(Payload) ->
|
||||||
error({encode_payload_fail, S})
|
error({encode_payload_fail, S})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-define(PROP_MAPPING,
|
||||||
|
#{<<"payload_format_indicator">> => 'Payload-Format-Indicator',
|
||||||
|
<<"message_expiry_interval">> => 'Message-Expiry-Interval',
|
||||||
|
<<"response_topic">> => 'Response-Topic',
|
||||||
|
<<"correlation_data">> => 'Correlation-Data',
|
||||||
|
<<"user_properties">> => 'User-Property',
|
||||||
|
<<"subscription_identifier">> => 'Subscription-Identifier',
|
||||||
|
<<"content_type">> => 'Content-Type'
|
||||||
|
}).
|
||||||
|
|
||||||
|
parse_props(Params) ->
|
||||||
|
Properties0 = proplists:get_value(<<"properties">>, Params, []),
|
||||||
|
Properties1 = lists:foldl(fun({Name, Value}, Acc) ->
|
||||||
|
case maps:find(Name, ?PROP_MAPPING) of
|
||||||
|
{ok, Key} -> Acc#{Key => Value};
|
||||||
|
error -> error({invalid_property, Name})
|
||||||
|
end
|
||||||
|
end, #{}, Properties0),
|
||||||
|
%% Compatible with older API
|
||||||
|
UserProp1 = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
|
||||||
|
UserProp2 =
|
||||||
|
case Properties1 of
|
||||||
|
#{'User-Property' := UserProp1List} -> generate_user_props(UserProp1List);
|
||||||
|
_ -> []
|
||||||
|
end,
|
||||||
|
#{properties => Properties1#{'User-Property' => UserProp1 ++ UserProp2}}.
|
||||||
|
|
||||||
generate_user_props(UserProps) when is_list(UserProps)->
|
generate_user_props(UserProps) when is_list(UserProps)->
|
||||||
generate_user_props_(UserProps, []);
|
lists:map(fun
|
||||||
|
({Name, Value}) -> {bin(Name), bin(Value)};
|
||||||
|
(Invalid) -> error({invalid_user_property, Invalid})
|
||||||
|
end
|
||||||
|
, UserProps);
|
||||||
generate_user_props(UserProps) ->
|
generate_user_props(UserProps) ->
|
||||||
error({user_properties_type_error, UserProps}).
|
error({user_properties_type_error, UserProps}).
|
||||||
|
|
||||||
generate_user_props_([{Name, Value} | Rest], Acc) ->
|
|
||||||
generate_user_props_(Rest, [{bin(Name), bin(Value)} | Acc]);
|
|
||||||
generate_user_props_([], Acc) ->
|
|
||||||
lists:reverse(Acc).
|
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Num) when is_number(Num) -> number_to_binary(Num);
|
bin(Num) when is_number(Num) -> number_to_binary(Num);
|
||||||
bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean);
|
bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean);
|
||||||
|
|
|
@ -568,11 +568,35 @@ t_pubsub(_) ->
|
||||||
false
|
false
|
||||||
end),
|
end),
|
||||||
|
|
||||||
|
%% properties
|
||||||
|
{ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
|
||||||
|
#{<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => <<"mytopic">>,
|
||||||
|
<<"qos">> => 1,
|
||||||
|
<<"payload">> => <<"hello properties">>,
|
||||||
|
<<"user_properties">> => #{<<"prop_key1">> => <<"prop_val1">>},
|
||||||
|
<<"properties">> => #{
|
||||||
|
<<"message_expiry_interval">> => 1000,
|
||||||
|
<<"user_properties">> => #{<<"prop_key2">> => <<"prop_val2">>}}
|
||||||
|
}),
|
||||||
|
Msg = receive
|
||||||
|
{publish, MsgTmp} ->
|
||||||
|
MsgTmp
|
||||||
|
after 150 ->
|
||||||
|
false
|
||||||
|
end,
|
||||||
|
?assertMatch(#{payload := <<"hello properties">>,
|
||||||
|
qos := 1,
|
||||||
|
properties := #{
|
||||||
|
'Message-Expiry-Interval' := 1000,
|
||||||
|
'User-Property' := [{<<"prop_key1">>,<<"prop_val1">>},
|
||||||
|
{<<"prop_key2">>,<<"prop_val2">>}]}}, Msg),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1),
|
ok = emqtt:disconnect(C1),
|
||||||
|
|
||||||
?assertEqual(4, emqx_metrics:val('messages.qos1.received') - Qos1Received),
|
?assertEqual(5, emqx_metrics:val('messages.qos1.received') - Qos1Received),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
|
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
|
||||||
?assertEqual(6, emqx_metrics:val('messages.received') - Received).
|
?assertEqual(7, emqx_metrics:val('messages.received') - Received).
|
||||||
|
|
||||||
loop([]) -> [];
|
loop([]) -> [];
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue