fix(bridge): Ensure that the node name is known

This commit is contained in:
firest 2022-11-01 14:41:01 +08:00
parent 317e15314e
commit 6c52d5de1f
2 changed files with 45 additions and 11 deletions

View File

@ -52,7 +52,9 @@
explain_posix/1, explain_posix/1,
pmap/2, pmap/2,
pmap/3, pmap/3,
readable_error_msg/1 readable_error_msg/1,
safe_to_existing_atom/1,
safe_to_existing_atom/2
]). ]).
-export([ -export([
@ -463,6 +465,18 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
exit(timeout) exit(timeout)
end. end.
safe_to_existing_atom(In) ->
safe_to_existing_atom(In, utf8).
safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) ->
try_to_existing_atom(fun erlang:binary_to_existing_atom/2, [Bin, Encoding]);
safe_to_existing_atom(List, _Encoding) when is_list(List) ->
try_to_existing_atom(fun erlang:list_to_existing_atom/1, [List]);
safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) ->
{ok, Atom};
safe_to_existing_atom(_Any, _Encoding) ->
{error, invalid_type}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal Functions %% Internal Functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -533,6 +547,14 @@ readable_error_msg(Error) ->
end end
end. end.
try_to_existing_atom(Fun, Args) ->
try erlang:apply(Fun, Args) of
Atom ->
{ok, Atom}
catch
_:Reason -> {error, Reason}
end.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -492,23 +492,16 @@ lookup_from_local_node(BridgeType, BridgeName) ->
invalid -> invalid ->
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
OperFunc -> OperFunc ->
TargetNode = binary_to_atom(Node, utf8),
ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]),
case maps:get(enable, ConfMap, false) of case maps:get(enable, ConfMap, false) of
false -> false ->
{403, {403,
error_msg( error_msg(
'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">> 'FORBIDDEN_REQUEST',
<<"forbidden operation: bridge disabled">>
)}; )};
true -> true ->
case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of call_operation(Node, OperFunc, BridgeType, BridgeName)
ok ->
{200};
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end
end end
end end
). ).
@ -707,3 +700,22 @@ bin(S) when is_atom(S) ->
atom_to_binary(S, utf8); atom_to_binary(S, utf8);
bin(S) when is_binary(S) -> bin(S) when is_binary(S) ->
S. S.
call_operation(Node, OperFunc, BridgeType, BridgeName) ->
case emqx_misc:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
case
emqx_bridge_proto_v1:OperFunc(
TargetNode, BridgeType, BridgeName
)
of
ok ->
{200};
{error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end;
{error, _} ->
{400, error_msg('INVALID_NODE', <<"invalid node">>)}
end.