Merge pull request #7972 from JimMoen/fix-pub-api

This commit is contained in:
JimMoen 2022-05-17 16:31:54 +08:00 committed by GitHub
commit b1d642bd6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 25 deletions

View File

@ -71,19 +71,10 @@ subscribe(_Bindings, Params) ->
publish(_Bindings, Params) -> publish(_Bindings, Params) ->
logger:debug("API publish Params:~p", [Params]), logger:debug("API publish Params:~p", [Params]),
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), try parse_publish_params(Params) of
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of Result -> do_publish(Params, Result)
{ok, MsgIds} -> catch
case proplists:get_value(<<"return">>, Params, undefined) of _E : _R -> minirest:return({ok, ?ERROR8, bad_params})
undefined -> minirest:return(ok);
_Val ->
case proplists:get_value(<<"topics">>, Params, undefined) of
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
_ -> minirest:return({ok, #{msgids => MsgIds}})
end
end;
Result ->
minirest:return(Result)
end. end.
unsubscribe(_Bindings, Params) -> unsubscribe(_Bindings, Params) ->
@ -124,14 +115,19 @@ loop_publish(Params) ->
loop_publish([], Result) -> loop_publish([], Result) ->
lists:reverse(Result); lists:reverse(Result);
loop_publish([Params | ParamsN], Acc) -> loop_publish([Params | ParamsN], Acc) ->
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params), Result =
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of try parse_publish_params(Params) of
{ok, _} -> 0; Res ->
{_, Code0, _} -> Code0 Code = case do_publish(Params, Res) of
end, {ok, _} -> 0;
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params), {_, Code0, _} -> Code0
proplists:get_value(<<"topics">>, Params, <<"">>)), end,
code => Code}, #{topic => resp_topic(proplists:get_value(<<"topic">>, Params),
proplists:get_value(<<"topics">>, Params, <<"">>)),
code => Code}
catch
_E : _R -> #{code => ?ERROR8, message => <<"bad_params">>}
end,
loop_publish(ParamsN, [Result | Acc]). loop_publish(ParamsN, [Result | Acc]).
loop_unsubscribe(Params) -> loop_unsubscribe(Params) ->
@ -163,6 +159,21 @@ do_subscribe(ClientId, Topics, QoS) ->
_ -> ok _ -> ok
end. end.
do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) ->
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
{ok, MsgIds} ->
case proplists:get_value(<<"return">>, Params, undefined) of
undefined -> minirest:return(ok);
_Val ->
case proplists:get_value(<<"topics">>, Params, undefined) of
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
_ -> minirest:return({ok, #{msgids => MsgIds}})
end
end;
Result ->
minirest:return(Result)
end.
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps) do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
when not (is_binary(ClientId) or (ClientId =:= undefined)) -> when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
{ok, ?ERROR8, <<"bad clientid: must be string">>}; {ok, ?ERROR8, <<"bad clientid: must be string">>};
@ -207,7 +218,7 @@ parse_publish_params(Params) ->
Qos = proplists:get_value(<<"qos">>, Params, 0), Qos = proplists:get_value(<<"qos">>, Params, 0),
Retain = proplists:get_value(<<"retain">>, Params, false), Retain = proplists:get_value(<<"retain">>, Params, false),
Payload1 = maybe_maps_to_binary(Payload), Payload1 = maybe_maps_to_binary(Payload),
UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])), UserProps = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
{ClientId, Topics, Qos, Retain, Payload1, UserProps}. {ClientId, Topics, Qos, Retain, Payload1, UserProps}.
parse_unsubscribe_params(Params) -> parse_unsubscribe_params(Params) ->
@ -263,7 +274,23 @@ maybe_maps_to_binary(Payload) ->
error({encode_payload_fail, S}) error({encode_payload_fail, S})
end. end.
check_user_props(UserProps) when is_list(UserProps) -> generate_user_props(UserProps) when is_list(UserProps)->
UserProps; generate_user_props_(UserProps, []);
check_user_props(UserProps) -> generate_user_props(UserProps) ->
error({user_properties_type_error, UserProps}). error({user_properties_type_error, UserProps}).
generate_user_props_([{Name, Value} | Rest], Acc) ->
generate_user_props_(Rest, [{bin(Name), bin(Value)} | Acc]);
generate_user_props_([], Acc) ->
lists:reverse(Acc).
bin(Bin) when is_binary(Bin) -> Bin;
bin(Num) when is_number(Num) -> number_to_binary(Num);
bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean);
bin(Other) -> error({user_properties_type_error, Other}).
-define(FLOAT_PRECISION, 17).
number_to_binary(Int) when is_integer(Int) ->
integer_to_binary(Int);
number_to_binary(Float) when is_float(Float) ->
float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]).

View File

@ -447,6 +447,20 @@ t_pubsub(_) ->
<<"payload">> => <<"hello">>}), <<"payload">> => <<"hello">>}),
?assertEqual(?ERROR8, get(<<"code">>, BadClient2)), ?assertEqual(?ERROR8, get(<<"code">>, BadClient2)),
{ok, BadParams} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
#{<<"clientid">> => 1,
<<"topics">> => <<"mytopic">>,
<<"qos">> => 1,
<<"payload">> => <<"hello">>,
<<"user_properties">> =>
#{<<"id">> => 10010,
<<"name">> => <<"emqx">>,
<<"foo">> => ["bad_properties1", "bad_properties2"],
<<"boolean">> => false
}
}),
?assertEqual(?ERROR8, get(<<"code">>, BadParams)),
{ok, BadClient3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(), {ok, BadClient3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => 1, #{<<"clientid">> => 1,
<<"topic">> => <<"mytopic">>}), <<"topic">> => <<"mytopic">>}),