diff --git a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl index b41c801d4..3fe859811 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -71,19 +71,10 @@ subscribe(_Bindings, Params) -> publish(_Bindings, Params) -> logger:debug("API publish Params:~p", [Params]), - {ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), - case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of - {ok, MsgIds} -> - case proplists:get_value(<<"return">>, Params, undefined) of - undefined -> minirest:return(ok); - _Val -> - case proplists:get_value(<<"topics">>, Params, undefined) of - undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}}); - _ -> minirest:return({ok, #{msgids => MsgIds}}) - end - end; - Result -> - minirest:return(Result) + try parse_publish_params(Params) of + Result -> do_publish(Params, Result) + catch + _E : _R -> minirest:return({ok, ?ERROR8, bad_params}) end. unsubscribe(_Bindings, Params) -> @@ -124,14 +115,19 @@ loop_publish(Params) -> loop_publish([], Result) -> lists:reverse(Result); loop_publish([Params | ParamsN], Acc) -> - {ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), - Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of - {ok, _} -> 0; - {_, Code0, _} -> Code0 - end, - Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), - proplists:get_value(<<"topics">>, Params, <<"">>)), - code => Code}, + Result = + try parse_publish_params(Params) of + Res -> + Code = case do_publish(Params, Res) of + {ok, _} -> 0; + {_, Code0, _} -> Code0 + end, + #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), + proplists:get_value(<<"topics">>, Params, <<"">>)), + code => Code} + catch + _E : _R -> #{code => ?ERROR8, message => <<"bad_params">>} + end, loop_publish(ParamsN, [Result | Acc]). loop_unsubscribe(Params) -> @@ -163,6 +159,21 @@ do_subscribe(ClientId, Topics, QoS) -> _ -> ok end. +do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) -> + case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of + {ok, MsgIds} -> + case proplists:get_value(<<"return">>, Params, undefined) of + undefined -> minirest:return(ok); + _Val -> + case proplists:get_value(<<"topics">>, Params, undefined) of + undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}}); + _ -> minirest:return({ok, #{msgids => MsgIds}}) + end + end; + Result -> + minirest:return(Result) + end. + do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps) when not (is_binary(ClientId) or (ClientId =:= undefined)) -> {ok, ?ERROR8, <<"bad clientid: must be string">>}; @@ -207,7 +218,7 @@ parse_publish_params(Params) -> Qos = proplists:get_value(<<"qos">>, Params, 0), Retain = proplists:get_value(<<"retain">>, Params, false), Payload1 = maybe_maps_to_binary(Payload), - UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])), + UserProps = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])), {ClientId, Topics, Qos, Retain, Payload1, UserProps}. parse_unsubscribe_params(Params) -> @@ -263,7 +274,23 @@ maybe_maps_to_binary(Payload) -> error({encode_payload_fail, S}) end. -check_user_props(UserProps) when is_list(UserProps) -> - UserProps; -check_user_props(UserProps) -> +generate_user_props(UserProps) when is_list(UserProps)-> + generate_user_props_(UserProps, []); +generate_user_props(UserProps) -> error({user_properties_type_error, UserProps}). + +generate_user_props_([{Name, Value} | Rest], Acc) -> + generate_user_props_(Rest, [{bin(Name), bin(Value)} | Acc]); +generate_user_props_([], Acc) -> + lists:reverse(Acc). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Num) when is_number(Num) -> number_to_binary(Num); +bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean); +bin(Other) -> error({user_properties_type_error, Other}). + +-define(FLOAT_PRECISION, 17). +number_to_binary(Int) when is_integer(Int) -> + integer_to_binary(Int); +number_to_binary(Float) when is_float(Float) -> + float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]).