fix(mgmt): pubsub api use bad params caused sub client crash
This commit is contained in:
parent
9463d02fd0
commit
d655bea16a
|
@ -71,19 +71,10 @@ subscribe(_Bindings, Params) ->
|
||||||
|
|
||||||
publish(_Bindings, Params) ->
|
publish(_Bindings, Params) ->
|
||||||
logger:debug("API publish Params:~p", [Params]),
|
logger:debug("API publish Params:~p", [Params]),
|
||||||
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
|
try parse_publish_params(Params) of
|
||||||
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
Result -> do_publish(Params, Result)
|
||||||
{ok, MsgIds} ->
|
catch
|
||||||
case proplists:get_value(<<"return">>, Params, undefined) of
|
_E : _R -> minirest:return({ok, ?ERROR8, bad_params})
|
||||||
undefined -> minirest:return(ok);
|
|
||||||
_Val ->
|
|
||||||
case proplists:get_value(<<"topics">>, Params, undefined) of
|
|
||||||
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
|
|
||||||
_ -> minirest:return({ok, #{msgids => MsgIds}})
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
Result ->
|
|
||||||
minirest:return(Result)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unsubscribe(_Bindings, Params) ->
|
unsubscribe(_Bindings, Params) ->
|
||||||
|
@ -124,14 +115,19 @@ loop_publish(Params) ->
|
||||||
loop_publish([], Result) ->
|
loop_publish([], Result) ->
|
||||||
lists:reverse(Result);
|
lists:reverse(Result);
|
||||||
loop_publish([Params | ParamsN], Acc) ->
|
loop_publish([Params | ParamsN], Acc) ->
|
||||||
{ClientId, Topic, Qos, Retain, Payload, UserProps} = parse_publish_params(Params),
|
Result =
|
||||||
Code = case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
try parse_publish_params(Params) of
|
||||||
{ok, _} -> 0;
|
Res ->
|
||||||
{_, Code0, _} -> Code0
|
Code = case do_publish(Params, Res) of
|
||||||
end,
|
{ok, _} -> 0;
|
||||||
Result = #{topic => resp_topic(proplists:get_value(<<"topic">>, Params),
|
{_, Code0, _} -> Code0
|
||||||
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
end,
|
||||||
code => Code},
|
#{topic => resp_topic(proplists:get_value(<<"topic">>, Params),
|
||||||
|
proplists:get_value(<<"topics">>, Params, <<"">>)),
|
||||||
|
code => Code}
|
||||||
|
catch
|
||||||
|
_E : _R -> #{code => ?ERROR8, message => <<"bad_params">>}
|
||||||
|
end,
|
||||||
loop_publish(ParamsN, [Result | Acc]).
|
loop_publish(ParamsN, [Result | Acc]).
|
||||||
|
|
||||||
loop_unsubscribe(Params) ->
|
loop_unsubscribe(Params) ->
|
||||||
|
@ -163,6 +159,21 @@ do_subscribe(ClientId, Topics, QoS) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_publish(Params, {ClientId, Topic, Qos, Retain, Payload, UserProps}) ->
|
||||||
|
case do_publish(ClientId, Topic, Qos, Retain, Payload, UserProps) of
|
||||||
|
{ok, MsgIds} ->
|
||||||
|
case proplists:get_value(<<"return">>, Params, undefined) of
|
||||||
|
undefined -> minirest:return(ok);
|
||||||
|
_Val ->
|
||||||
|
case proplists:get_value(<<"topics">>, Params, undefined) of
|
||||||
|
undefined -> minirest:return({ok, #{msgid => lists:last(MsgIds)}});
|
||||||
|
_ -> minirest:return({ok, #{msgids => MsgIds}})
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
Result ->
|
||||||
|
minirest:return(Result)
|
||||||
|
end.
|
||||||
|
|
||||||
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
|
do_publish(ClientId, _Topics, _Qos, _Retain, _Payload, _UserProps)
|
||||||
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
when not (is_binary(ClientId) or (ClientId =:= undefined)) ->
|
||||||
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
{ok, ?ERROR8, <<"bad clientid: must be string">>};
|
||||||
|
@ -207,7 +218,7 @@ parse_publish_params(Params) ->
|
||||||
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
Qos = proplists:get_value(<<"qos">>, Params, 0),
|
||||||
Retain = proplists:get_value(<<"retain">>, Params, false),
|
Retain = proplists:get_value(<<"retain">>, Params, false),
|
||||||
Payload1 = maybe_maps_to_binary(Payload),
|
Payload1 = maybe_maps_to_binary(Payload),
|
||||||
UserProps = check_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
|
UserProps = generate_user_props(proplists:get_value(<<"user_properties">>, Params, [])),
|
||||||
{ClientId, Topics, Qos, Retain, Payload1, UserProps}.
|
{ClientId, Topics, Qos, Retain, Payload1, UserProps}.
|
||||||
|
|
||||||
parse_unsubscribe_params(Params) ->
|
parse_unsubscribe_params(Params) ->
|
||||||
|
@ -263,7 +274,23 @@ maybe_maps_to_binary(Payload) ->
|
||||||
error({encode_payload_fail, S})
|
error({encode_payload_fail, S})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_user_props(UserProps) when is_list(UserProps) ->
|
generate_user_props(UserProps) when is_list(UserProps)->
|
||||||
UserProps;
|
generate_user_props_(UserProps, []);
|
||||||
check_user_props(UserProps) ->
|
generate_user_props(UserProps) ->
|
||||||
error({user_properties_type_error, UserProps}).
|
error({user_properties_type_error, UserProps}).
|
||||||
|
|
||||||
|
generate_user_props_([{Name, Value} | Rest], Acc) ->
|
||||||
|
generate_user_props_(Rest, [{bin(Name), bin(Value)} | Acc]);
|
||||||
|
generate_user_props_([], Acc) ->
|
||||||
|
lists:reverse(Acc).
|
||||||
|
|
||||||
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
bin(Num) when is_number(Num) -> number_to_binary(Num);
|
||||||
|
bin(Boolean) when is_boolean(Boolean) -> atom_to_binary(Boolean);
|
||||||
|
bin(Other) -> error({user_properties_type_error, Other}).
|
||||||
|
|
||||||
|
-define(FLOAT_PRECISION, 17).
|
||||||
|
number_to_binary(Int) when is_integer(Int) ->
|
||||||
|
integer_to_binary(Int);
|
||||||
|
number_to_binary(Float) when is_float(Float) ->
|
||||||
|
float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]).
|
||||||
|
|
Loading…
Reference in New Issue