feat: restructure emqx_bridge_v2 for better readability
This commit is contained in:
parent
e13196c1ca
commit
a5a060473c
|
|
@ -27,8 +27,6 @@
|
||||||
|
|
||||||
-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
|
-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
|
||||||
-define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])).
|
-define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])).
|
||||||
-define(TOP_LELVE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path())).
|
|
||||||
-define(LEAF_NODE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path() ++ ['?', '?'])).
|
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
{ok, Sup} = emqx_bridge_sup:start_link(),
|
{ok, Sup} = emqx_bridge_sup:start_link(),
|
||||||
|
|
@ -38,16 +36,12 @@ start(_StartType, _StartArgs) ->
|
||||||
ok = emqx_bridge:load_hook(),
|
ok = emqx_bridge:load_hook(),
|
||||||
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
|
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
|
||||||
ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
|
ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
|
||||||
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2),
|
|
||||||
ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2),
|
|
||||||
?tp(emqx_bridge_app_started, #{}),
|
?tp(emqx_bridge_app_started, #{}),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH),
|
emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH),
|
||||||
emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
|
emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
|
||||||
emqx_conf:remove_handler(emqx_bridge_v2:config_key_path()),
|
|
||||||
emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2),
|
|
||||||
ok = emqx_bridge:unload(),
|
ok = emqx_bridge:unload(),
|
||||||
ok = emqx_bridge_v2:unload(),
|
ok = emqx_bridge_v2:unload(),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -112,7 +112,7 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
%% Remove the fake connector
|
%% Remove the fake connector
|
||||||
ok = emqx_connector:remove(con_type(), con_name()),
|
{ok, _} = emqx_connector:remove(con_type(), con_name()),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
t_create_remove(_) ->
|
t_create_remove(_) ->
|
||||||
|
|
@ -120,6 +120,18 @@ t_create_remove(_) ->
|
||||||
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_list(_) ->
|
||||||
|
[] = emqx_bridge_v2:list(),
|
||||||
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
||||||
|
1 = length(emqx_bridge_v2:list()),
|
||||||
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge2, bridge_config()),
|
||||||
|
2 = length(emqx_bridge_v2:list()),
|
||||||
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
|
1 = length(emqx_bridge_v2:list()),
|
||||||
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge2),
|
||||||
|
0 = length(emqx_bridge_v2:list()),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_create_dry_run(_) ->
|
t_create_dry_run(_) ->
|
||||||
ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()).
|
ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()).
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -205,35 +205,29 @@
|
||||||
|
|
||||||
-callback query_mode(Config :: term()) -> query_mode().
|
-callback query_mode(Config :: term()) -> query_mode().
|
||||||
|
|
||||||
%% This callback handles the installation of a specified Bridge V2 resource.
|
%% This callback handles the installation of a specified channel.
|
||||||
%%
|
%%
|
||||||
%% It's guaranteed that the provided Bridge V2 is not already installed when this
|
%% If the channel cannot be successfully installed, the callback shall
|
||||||
%% function is invoked. Upon successful installation, the function should return a
|
%% throw an exception or return an error tuple.
|
||||||
%% new state with the installed Bridge V2 encapsulated within the `installed_bridge_v2s` map.
|
|
||||||
%%
|
|
||||||
%% The Bridge V2 state must be stored in the `installed_bridge_v2s` map using the
|
|
||||||
%% Bridge V2 resource ID as the key, as the caching mechanism depends on this structure.
|
|
||||||
%%
|
|
||||||
%% If the Bridge V2 cannot be successfully installed, the callback shall
|
|
||||||
%% throw an exception.
|
|
||||||
-callback on_add_channel(
|
-callback on_add_channel(
|
||||||
ResId :: term(), ResourceState :: term(), BridgeV2Id :: binary(), ChannelConfig :: map()
|
ResId :: term(), ResourceState :: term(), ChannelId :: binary(), ChannelConfig :: map()
|
||||||
) -> {ok, NewState :: #{installed_bridge_v2s := map()}}.
|
) -> {ok, term()} | {error, term()}.
|
||||||
|
|
||||||
%% This callback handles the deinstallation of a specified Bridge V2 resource.
|
%% This callback handles the removal of a specified channel resource.
|
||||||
%%
|
%%
|
||||||
%% It's guaranteed that the provided Bridge V2 is installed when this
|
%% It's guaranteed that the provided channel is installed when this
|
||||||
%% function is invoked. Upon successful deinstallation, the function should return
|
%% function is invoked. Upon successful deinstallation, the function should return
|
||||||
%% a new state where the Bridge V2 id key has been removed from the `installed_bridge_v2s` map.
|
%% a new state
|
||||||
%%
|
%%
|
||||||
%% If the Bridge V2 cannot be successfully deinstalled, the callback shall
|
%% If the channel cannot be successfully deinstalled, the callback should
|
||||||
%% log an error.
|
%% log an error.
|
||||||
%%
|
%%
|
||||||
%% Also see the documentation for `on_add_channel/4`.
|
|
||||||
-callback on_remove_channel(
|
-callback on_remove_channel(
|
||||||
ResId :: term(), ResourceState :: term(), BridgeV2Id :: binary()
|
ResId :: term(), ResourceState :: term(), ChannelId :: binary()
|
||||||
) -> {ok, NewState :: term()}.
|
) -> {ok, NewState :: term()}.
|
||||||
|
|
||||||
|
%% This callback shall return a list of channel configs that are currently active
|
||||||
|
%% for the resource with the given id.
|
||||||
-callback on_get_channels(
|
-callback on_get_channels(
|
||||||
ResId :: term()
|
ResId :: term()
|
||||||
) -> {ok, [term()]}.
|
) -> {ok, [term()]}.
|
||||||
|
|
@ -362,7 +356,6 @@ query(ResId, Request) ->
|
||||||
-spec query(resource_id(), Request :: term(), query_opts()) ->
|
-spec query(resource_id(), Request :: term(), query_opts()) ->
|
||||||
Result :: term().
|
Result :: term().
|
||||||
query(ResId, Request, Opts) ->
|
query(ResId, Request, Opts) ->
|
||||||
%% We keep this A
|
|
||||||
case get_query_mode_error(ResId, Opts) of
|
case get_query_mode_error(ResId, Opts) of
|
||||||
{error, _} = ErrorTuple ->
|
{error, _} = ErrorTuple ->
|
||||||
ErrorTuple;
|
ErrorTuple;
|
||||||
|
|
@ -406,15 +399,15 @@ query(ResId, Request, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_query_mode_error(ResId, Opts) ->
|
get_query_mode_error(ResId, Opts) ->
|
||||||
case emqx_bridge_v2:is_bridge_v2_id(ResId) of
|
case maps:get(query_mode_cache_override, Opts, true) of
|
||||||
true ->
|
false ->
|
||||||
case Opts of
|
case Opts of
|
||||||
#{query_mode := QueryMode} ->
|
#{query_mode := QueryMode} ->
|
||||||
{QueryMode, ok};
|
{QueryMode, ok};
|
||||||
_ ->
|
_ ->
|
||||||
{async, unhealthy_target}
|
{async, unhealthy_target}
|
||||||
end;
|
end;
|
||||||
false ->
|
true ->
|
||||||
case emqx_resource_manager:lookup_cached(ResId) of
|
case emqx_resource_manager:lookup_cached(ResId) of
|
||||||
{ok, _Group, #{query_mode := QM, error := Error}} ->
|
{ok, _Group, #{query_mode := QM, error := Error}} ->
|
||||||
{QM, Error};
|
{QM, Error};
|
||||||
|
|
|
||||||
|
|
@ -522,7 +522,7 @@ format_action(Actions) ->
|
||||||
do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
|
do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
|
||||||
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
||||||
do_format_action({bridge_v2, BridgeType, BridgeName}) ->
|
do_format_action({bridge_v2, BridgeType, BridgeName}) ->
|
||||||
emqx_bridge_v2:id(BridgeType, BridgeName);
|
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
|
||||||
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
|
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
|
||||||
#{
|
#{
|
||||||
function => printable_function_name(Mod, Func),
|
function => printable_function_name(Mod, Func),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue