From 6c52d5de1f4373f58b5c81616a7447404c1f3d65 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 1 Nov 2022 14:41:01 +0800 Subject: [PATCH] fix(bridge): Ensure that the node name is known --- apps/emqx/src/emqx_misc.erl | 24 +++++++++++++++++- apps/emqx_bridge/src/emqx_bridge_api.erl | 32 ++++++++++++++++-------- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 6833cbabb..54157da78 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -52,7 +52,9 @@ explain_posix/1, pmap/2, pmap/3, - readable_error_msg/1 + readable_error_msg/1, + safe_to_existing_atom/1, + safe_to_existing_atom/2 ]). -export([ @@ -463,6 +465,18 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> exit(timeout) end. +safe_to_existing_atom(In) -> + safe_to_existing_atom(In, utf8). + +safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) -> + try_to_existing_atom(fun erlang:binary_to_existing_atom/2, [Bin, Encoding]); +safe_to_existing_atom(List, _Encoding) when is_list(List) -> + try_to_existing_atom(fun erlang:list_to_existing_atom/1, [List]); +safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) -> + {ok, Atom}; +safe_to_existing_atom(_Any, _Encoding) -> + {error, invalid_type}. + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ @@ -533,6 +547,14 @@ readable_error_msg(Error) -> end end. +try_to_existing_atom(Fun, Args) -> + try erlang:apply(Fun, Args) of + Atom -> + {ok, Atom} + catch + _:Reason -> {error, Reason} + end. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 37a42ab3d..66a9079f8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -492,23 +492,16 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; OperFunc -> - TargetNode = binary_to_atom(Node, utf8), ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), case maps:get(enable, ConfMap, false) of false -> {403, error_msg( - 'FORBIDDEN_REQUEST', <<"forbidden operation: bridge disabled">> + 'FORBIDDEN_REQUEST', + <<"forbidden operation: bridge disabled">> )}; true -> - case emqx_bridge_proto_v1:OperFunc(TargetNode, BridgeType, BridgeName) of - ok -> - {200}; - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end + call_operation(Node, OperFunc, BridgeType, BridgeName) end end ). @@ -707,3 +700,22 @@ bin(S) when is_atom(S) -> atom_to_binary(S, utf8); bin(S) when is_binary(S) -> S. + +call_operation(Node, OperFunc, BridgeType, BridgeName) -> + case emqx_misc:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + case + emqx_bridge_proto_v1:OperFunc( + TargetNode, BridgeType, BridgeName + ) + of + ok -> + {200}; + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} + end; + {error, _} -> + {400, error_msg('INVALID_NODE', <<"invalid node">>)} + end.