diff --git a/CHANGES-4.4.md b/CHANGES-4.4.md index 6862d0f6a..dbc42fc3f 100644 --- a/CHANGES-4.4.md +++ b/CHANGES-4.4.md @@ -1,5 +1,11 @@ # EMQX 4.4 Changes +## v4.4.5 + +### Enhancements (synced from v4.3.16) +* HTTP API `mqtt/publish` support to publish with properties and user_properties. + + ## v4.4.4 ### Enhancements (synced from v4.3.15) diff --git a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl index 3fe859811..b2a050d17 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -74,7 +74,9 @@ publish(_Bindings, Params) -> try parse_publish_params(Params) of Result -> do_publish(Params, Result) catch - _E : _R -> minirest:return({ok, ?ERROR8, bad_params}) + _E : _R -> + logger:debug("API publish result:~p ~p", [_E, _R]), + minirest:return({ok, ?ERROR8, bad_params}) end. unsubscribe(_Bindings, Params) -> @@ -159,8 +161,8 @@ do_subscribe(ClientId, Topics, QoS) -> _ -> ok end. -do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) -> - case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of +do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, Props}) -> + case do_publish(ClientId, Topic, Qos, Retain, Payload, Props) of {ok, MsgIds} -> case proplists:get_value(<<"return">>, Params, undefined) of undefined -> minirest:return(ok); @@ -174,17 +176,16 @@ do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) -> minirest:return(Result) end. -do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps) +do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _Props) when not (is_binary(ClientId) or (ClientId =:= undefined)) -> {ok, ?ERROR8, <<"bad clientid: must be string">>}; -do_publish(_ClientId, [], _Qos, _Retain, _Payload, _UserProps) -> +do_publish(_ClientId, [], _Qos, _Retain, _Payload, _Props) -> {ok, ?ERROR15, bad_topic}; -do_publish(ClientId, Topics, Qos, Retain, Payload, UserProps) -> +do_publish(ClientId, Topics, Qos, Retain, Payload, Props) -> MsgIds = lists:map(fun(Topic) -> - Msg = emqx_message:make(ClientId, Qos, Topic, Payload), - UserProps1 = #{'User-Property' => UserProps}, - _ = emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}, - headers = #{properties => UserProps1}}), + Msg = emqx_message:make(ClientId, Qos, Topic, Payload, + #{retain => Retain}, Props), + _ = emqx_mgmt:publish(Msg), emqx_guid:to_hexstr(Msg#message.id) end, Topics), {ok, MsgIds}. @@ -218,8 +219,8 @@ parse_publish_params(Params) -> Qos = proplists:get_value(<<"qos">>, Params, 0), Retain = proplists:get_value(<<"retain">>, Params, false), Payload1 = maybe_maps_to_binary(Payload), - UserProps = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])), - {ClientId, Topics, Qos, Retain, Payload1, UserProps}. + Props = parse_props(Params), + {ClientId, Topics, Qos, Retain, Payload1, Props}. parse_unsubscribe_params(Params) -> ClientId = proplists:get_value(<<"clientid">>, Params), @@ -274,16 +275,42 @@ maybe_maps_to_binary(Payload) -> error({encode_payload_fail, S}) end. +-define(PROP_MAPPING, + #{<<"payload_format_indicator">> => 'Payload-Format-Indicator', + <<"message_expiry_interval">> => 'Message-Expiry-Interval', + <<"response_topic">> => 'Response-Topic', + <<"correlation_data">> => 'Correlation-Data', + <<"user_properties">> => 'User-Property', + <<"subscription_identifier">> => 'Subscription-Identifier', + <<"content_type">> => 'Content-Type' + }). + +parse_props(Params) -> + Properties0 = proplists:get_value(<<"properties">>, Params, []), + Properties1 = lists:foldl(fun({Name, Value}, Acc) -> + case maps:find(Name, ?PROP_MAPPING) of + {ok, Key} -> Acc#{Key => Value}; + error -> error({invalid_property, Name}) + end + end, #{}, Properties0), + %% Compatible with older API + UserProp1 = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])), + UserProp2 = + case Properties1 of + #{'User-Property' := UserProp1List} -> generate_user_props(UserProp1List); + _ -> [] + end, + #{properties => Properties1#{'User-Property' => UserProp1 ++ UserProp2}}. + generate_user_props(UserProps) when is_list(UserProps)-> - generate_user_props_(UserProps, []); + lists:map(fun + ({Name, Value}) -> {bin(Name), bin(Value)}; + (Invalid) -> error({invalid_user_property, Invalid}) + end + , UserProps); generate_user_props(UserProps) -> error({user_properties_type_error, UserProps}). -generate_user_props_([{Name, Value} | Rest], Acc) -> - generate_user_props_(Rest, [{bin(Name), bin(Value)} | Acc]); -generate_user_props_([], Acc) -> - lists:reverse(Acc). - bin(Bin) when is_binary(Bin) -> Bin; bin(Num) when is_number(Num) -> number_to_binary(Num); bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean); diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index 0029b866f..f80128eb3 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -568,11 +568,35 @@ t_pubsub(_) -> false end), + %% properties + {ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(), + #{<<"clientid">> => ClientId, + <<"topic">> => <<"mytopic">>, + <<"qos">> => 1, + <<"payload">> => <<"hello properties">>, + <<"user_properties">> => #{<<"prop_key1">> => <<"prop_val1">>}, + <<"properties">> => #{ + <<"message_expiry_interval">> => 1000, + <<"user_properties">> => #{<<"prop_key2">> => <<"prop_val2">>}} + }), + Msg = receive + {publish, MsgTmp} -> + MsgTmp + after 150 -> + false + end, + ?assertMatch(#{payload := <<"hello properties">>, + qos := 1, + properties := #{ + 'Message-Expiry-Interval' := 1000, + 'User-Property' := [{<<"prop_key1">>,<<"prop_val1">>}, + {<<"prop_key2">>,<<"prop_val2">>}]}}, Msg), + ok = emqtt:disconnect(C1), - ?assertEqual(4, emqx_metrics:val('messages.qos1.received') - Qos1Received), + ?assertEqual(5, emqx_metrics:val('messages.qos1.received') - Qos1Received), ?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received), - ?assertEqual(6, emqx_metrics:val('messages.received') - Received). + ?assertEqual(7, emqx_metrics:val('messages.received') - Received). loop([]) -> [];