From d89d692d3e8f2506d833e43cf3482ec08d73a703 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 18 May 2022 15:11:29 +0800 Subject: [PATCH] refactor: move some APIs to new module emqx_bridge_resource --- apps/emqx_bridge/src/emqx_bridge.erl | 375 +++--------------- apps/emqx_bridge/src/emqx_bridge_api.erl | 33 +- apps/emqx_bridge/src/emqx_bridge_monitor.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 318 +++++++++++++++ .../src/proto/emqx_bridge_proto_v1.erl | 8 +- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 5 +- .../test/emqx_bridge_api_SUITE.erl | 10 +- apps/emqx_connector/src/emqx_connector.erl | 6 +- .../test/emqx_connector_api_SUITE.erl | 18 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- 10 files changed, 407 insertions(+), 370 deletions(-) create mode 100644 apps/emqx_bridge/src/emqx_bridge_resource.erl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fcd01a44b..6fe9bd6fe 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -27,34 +27,16 @@ -export([on_message_publish/1]). --export([ - resource_type/1, - bridge_type/1, - resource_id/1, - resource_id/2, - bridge_id/2, - parse_bridge_id/1 -]). - -export([ load/0, lookup/1, lookup/2, lookup/3, - list/0, - list_bridges_by_connector/1, - create/2, create/3, - recreate/2, - recreate/3, - create_dry_run/2, - remove/1, + disable_enable/3, remove/2, - update/2, - update/3, - stop/2, - restart/2, - reset_metrics/1 + list/0, + list_bridges_by_connector/1 ]). -export([send_message/2]). @@ -129,8 +111,8 @@ send_to_matched_egress_bridges(Topic, Msg) -> ). send_message(BridgeId, Message) -> - {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - ResId = emqx_bridge:resource_id(BridgeType, BridgeName), + {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), + ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> {error, {bridge_not_found, BridgeId}}; @@ -143,70 +125,23 @@ send_message(BridgeId, Message) -> config_key_path() -> [bridges]. -resource_type(<<"mqtt">>) -> emqx_connector_mqtt; -resource_type(mqtt) -> emqx_connector_mqtt; -resource_type(<<"http">>) -> emqx_connector_http; -resource_type(http) -> emqx_connector_http. - -bridge_type(emqx_connector_mqtt) -> mqtt; -bridge_type(emqx_connector_http) -> http. - post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - {fun remove/3, Removed}, - {fun create/3, Added}, - {fun update/3, Updated} + {fun emqx_bridge_resource:remove/3, Removed}, + {fun emqx_bridge_resource:create/3, Added}, + {fun emqx_bridge_resource:update/3, Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), Result. -perform_bridge_changes(Tasks) -> - perform_bridge_changes(Tasks, ok). - -perform_bridge_changes([], Result) -> - Result; -perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> - Result = maps:fold( - fun - ({_Type, _Name}, _Conf, {error, Reason}) -> - {error, Reason}; - ({Type, Name}, Conf, _) -> - case Action(Type, Name, Conf) of - {error, Reason} -> {error, Reason}; - Return -> Return - end - end, - Result0, - MapConfs - ), - perform_bridge_changes(Tasks, Result). - load() -> Bridges = emqx:get_config([bridges], #{}), emqx_bridge_monitor:ensure_all_started(Bridges). -resource_id(BridgeId) when is_binary(BridgeId) -> - <<"bridge:", BridgeId/binary>>. - -resource_id(BridgeType, BridgeName) -> - BridgeId = bridge_id(BridgeType, BridgeName), - resource_id(BridgeId). - -bridge_id(BridgeType, BridgeName) -> - Name = bin(BridgeName), - Type = bin(BridgeType), - <>. - -parse_bridge_id(BridgeId) -> - case string:split(bin(BridgeId), ":", all) of - [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; - _ -> error({invalid_bridge_id, BridgeId}) - end. - list() -> lists:foldl( fun({Type, NameAndConf}, Bridges) -> @@ -233,14 +168,14 @@ list_bridges_by_connector(ConnectorId) -> ]. lookup(Id) -> - {Type, Name} = parse_bridge_id(Id), + {Type, Name} = emqx_bridge_resource:parse_bridge_id(Id), lookup(Type, Name). lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). lookup(Type, Name, RawConf) -> - case emqx_resource:get_instance(resource_id(Type, Name)) of + case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of {error, not_found} -> {error, not_found}; {ok, _, Data} -> @@ -252,125 +187,52 @@ lookup(Type, Name, RawConf) -> }} end. -reset_metrics(ResourceId) -> - emqx_resource:reset_metrics(ResourceId). - -stop(Type, Name) -> - emqx_resource:stop(resource_id(Type, Name)). - -%% we don't provide 'start', as we want an already started bridge to be restarted. -restart(Type, Name) -> - emqx_resource:restart(resource_id(Type, Name)). - -create(BridgeId, Conf) -> - {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - create(BridgeType, BridgeName, Conf). - -create(Type, Name, Conf) -> - ?SLOG(info, #{ - msg => "create bridge", - type => Type, - name => Name, - config => Conf - }), - {ok, _Data} = emqx_resource:create_local( - resource_id(Type, Name), - <<"emqx_bridge">>, - emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), - #{} - ), - maybe_disable_bridge(Type, Name, Conf). - -update(BridgeId, {OldConf, Conf}) -> - {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - update(BridgeType, BridgeName, {OldConf, Conf}). - -update(Type, Name, {OldConf, Conf}) -> - %% TODO: sometimes its not necessary to restart the bridge connection. - %% - %% - if the connection related configs like `servers` is updated, we should restart/start - %% or stop bridges according to the change. - %% - if the connection related configs are not update, only non-connection configs like - %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated - %% without restarting the bridge. - %% - case if_only_to_toggle_enable(OldConf, Conf) of - false -> - ?SLOG(info, #{ - msg => "update bridge", - type => Type, - name => Name, - config => Conf - }), - case recreate(Type, Name, Conf) of - {ok, _} -> - maybe_disable_bridge(Type, Name, Conf); - {error, not_found} -> - ?SLOG(warning, #{ - msg => "updating_a_non-exist_bridge_need_create_a_new_one", - type => Type, - name => Name, - config => Conf - }), - create(Type, Name, Conf); - {error, Reason} -> - {error, {update_bridge_failed, Reason}} - end; - true -> - %% we don't need to recreate the bridge if this config change is only to - %% toggole the config 'bridge.{type}.{name}.enable' - case maps:get(enable, Conf, true) of - true -> restart(Type, Name); - false -> stop(Type, Name) - end - end. - -recreate(Type, Name) -> - recreate(Type, Name, emqx:get_config([bridges, Type, Name])). - -recreate(Type, Name, Conf) -> - emqx_resource:recreate_local( - resource_id(Type, Name), - emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), - #{} +disable_enable(Action, BridgeType, BridgeName) when + Action =:= disable; Action =:= enable +-> + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + {Action, BridgeType, BridgeName}, + #{override_to => cluster} ). -create_dry_run(Type, Conf) -> - Conf0 = fill_dry_run_conf(Conf), - case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of - {ok, Conf1} -> - TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]), - case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of - {error, Reason} -> - {error, Reason}; - {ok, ConfNew} -> - Res = emqx_resource:create_dry_run_local( - emqx_bridge:resource_type(Type), ConfNew - ), - _ = maybe_clear_certs(TmpPath, ConfNew), - Res - end; - {error, _} = Error -> - Error - end. +create(BridgeType, BridgeName, RawConf) -> + emqx_conf:update( + emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], + RawConf, + #{override_to => cluster} + ). -remove(BridgeId) -> - {BridgeType, BridgeName} = parse_bridge_id(BridgeId), - remove(BridgeType, BridgeName, #{}). +remove(BridgeType, BridgeName) -> + emqx_conf:remove( + emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], + #{override_to => cluster} + ). -remove(Type, Name) -> - remove(Type, Name, undefined). +%%======================================================================================== +%% Helper functions +%%======================================================================================== -%% just for perform_bridge_changes/1 -remove(Type, Name, _Conf) -> - ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), - case emqx_resource:remove_local(resource_id(Type, Name)) of - ok -> ok; - {error, not_found} -> ok; - {error, Reason} -> {error, Reason} - end. +perform_bridge_changes(Tasks) -> + perform_bridge_changes(Tasks, ok). + +perform_bridge_changes([], Result) -> + Result; +perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> + Result = maps:fold( + fun + ({_Type, _Name}, _Conf, {error, Reason}) -> + {error, Reason}; + ({Type, Name}, Conf, _) -> + case Action(Type, Name, Conf) of + {error, Reason} -> {error, Reason}; + Return -> Return + end + end, + Result0, + MapConfs + ), + perform_bridge_changes(Tasks, Result). diff_confs(NewConfs, OldConfs) -> emqx_map_lib:diff_maps( @@ -416,104 +278,10 @@ get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of - true -> [bridge_id(BType, BName) | Acc]; + true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc]; false -> Acc end. -parse_confs( - http, - _Name, - #{ - url := Url, - method := Method, - body := Body, - headers := Headers, - request_timeout := ReqTimeout - } = Conf -) -> - {BaseUrl, Path} = parse_url(Url), - {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), - Conf#{ - base_url => BaseUrl2, - request => - #{ - path => Path, - method => Method, - body => Body, - headers => Headers, - request_timeout => ReqTimeout - } - }; -parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when - is_binary(ConnId) --> - case emqx_connector:parse_connector_id(ConnId) of - {Type, ConnName} -> - ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), - make_resource_confs( - Direction, - ConnectorConfs, - maps:without([connector, direction], Conf), - Type, - Name - ); - {_ConnType, _ConnName} -> - error({cannot_use_connector_with_different_type, ConnId}) - end; -parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when - is_map(ConnectorConfs) --> - make_resource_confs( - Direction, - ConnectorConfs, - maps:without([connector, direction], Conf), - Type, - Name - ). - -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> - BName = bridge_id(Type, Name), - ConnectorConfs#{ - ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} - }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> - ConnectorConfs#{ - egress => BridgeConf - }. - -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - error({invalid_url, Url}) - end. - -maybe_disable_bridge(Type, Name, Conf) -> - case maps:get(enable, Conf, true) of - false -> stop(Type, Name); - true -> ok - end. - -if_only_to_toggle_enable(OldConf, Conf) -> - #{added := Added, removed := Removed, changed := Updated} = - emqx_map_lib:diff_maps(OldConf, Conf), - case {Added, Removed, Updated} of - {Added, Removed, #{enable := _} = Updated} when - map_size(Added) =:= 0, - map_size(Removed) =:= 0, - map_size(Updated) =:= 1 - -> - true; - {_, _, _} -> - false - end. - -spec get_basic_usage_info() -> #{ num_bridges => non_neg_integer(), @@ -551,42 +319,3 @@ get_basic_usage_info() -> _:_ -> InitialAcc end. - -fill_dry_run_conf(Conf) -> - Conf#{ - <<"egress">> => - #{ - <<"remote_topic">> => <<"t">>, - <<"remote_qos">> => 0, - <<"retain">> => true, - <<"payload">> => <<"val">> - }, - <<"ingress">> => - #{<<"remote_topic">> => <<"t">>} - }. - -maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) -> - %% don't remove the cert files if they are in use - case is_tmp_path_conf(TmpPath, SslConf) of - true -> emqx_connector_ssl:clear_certs(TmpPath, Conf); - false -> ok - end. - -is_tmp_path_conf(TmpPath, #{certfile := Certfile}) -> - is_tmp_path(TmpPath, Certfile); -is_tmp_path_conf(TmpPath, #{keyfile := Keyfile}) -> - is_tmp_path(TmpPath, Keyfile); -is_tmp_path_conf(TmpPath, #{cacertfile := CaCertfile}) -> - is_tmp_path(TmpPath, CaCertfile); -is_tmp_path_conf(_TmpPath, _Conf) -> - false. - -is_tmp_path(TmpPath, File) -> - string:str(str(File), str(TmpPath)) > 0. - -str(Bin) when is_binary(Bin) -> binary_to_list(Bin); -str(Str) when is_list(Str) -> Str. - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Str) when is_list(Str) -> list_to_binary(Str); -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 33eda4f4d..5bc313e42 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -47,7 +47,7 @@ -define(CONN_TYPES, [mqtt]). -define(TRY_PARSE_ID(ID, EXPR), - try emqx_bridge:parse_bridge_id(Id) of + try emqx_bridge_resource:parse_bridge_id(Id) of {BridgeType, BridgeName} -> EXPR catch @@ -417,12 +417,7 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> '/bridges/:id'(delete, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, - case - emqx_conf:remove( - emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - #{override_to => cluster} - ) - of + case emqx_bridge:remove(BridgeType, BridgeName) of {ok, _} -> {204}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} end @@ -431,7 +426,11 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> '/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, - case emqx_bridge:reset_metrics(emqx_bridge:resource_id(BridgeType, BridgeName)) of + case + emqx_bridge_resource:reset_metrics( + emqx_bridge_resource:resource_id(BridgeType, BridgeName) + ) + of ok -> {200, <<"Reset success">>}; Reason -> {400, error_msg('BAD_REQUEST', Reason)} end @@ -464,13 +463,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; OperFunc when OperFunc == enable; OperFunc == disable -> - case - emqx_conf:update( - emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - {OperFunc, BridgeType, BridgeName}, - #{override_to => cluster} - ) - of + case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of {ok, _} -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> @@ -532,13 +525,7 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> end. ensure_bridge_created(BridgeType, BridgeName, Conf) -> - case - emqx_conf:update( - emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - Conf, - #{override_to => cluster} - ) - of + case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> ok; {error, Reason} -> {error, error_msg('BAD_REQUEST', Reason)} end. @@ -569,7 +556,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> [] -> ?SLOG(warning, #{ msg => "bridge_inconsistent_in_cluster", - bridge => emqx_bridge:bridge_id(Type, Name) + bridge => emqx_bridge_resource:bridge_id(Type, Name) }), Acc end diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index b9a22bb8c..70c89f352 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -73,7 +73,7 @@ load_bridges(Configs) -> fun({Type, NamedConf}) -> lists:foreach( fun({Name, Conf}) -> - _Res = emqx_bridge:create(Type, Name, Conf), + _Res = emqx_bridge_resource:create(Type, Name, Conf), ?tp( emqx_bridge_monitor_loaded_bridge, #{ diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl new file mode 100644 index 000000000..9d33f6eee --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -0,0 +1,318 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_resource). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ + bridge_to_resource_type/1, + resource_id/1, + resource_id/2, + bridge_id/2, + parse_bridge_id/1 +]). + +-export([ + create/2, + create/3, + recreate/2, + recreate/3, + create_dry_run/2, + remove/1, + remove/2, + remove/3, + update/2, + update/3, + stop/2, + restart/2, + reset_metrics/1 +]). + +bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; +bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"http">>) -> emqx_connector_http; +bridge_to_resource_type(http) -> emqx_connector_http. + +resource_id(BridgeId) when is_binary(BridgeId) -> + <<"bridge:", BridgeId/binary>>. + +resource_id(BridgeType, BridgeName) -> + BridgeId = bridge_id(BridgeType, BridgeName), + resource_id(BridgeId). + +bridge_id(BridgeType, BridgeName) -> + Name = bin(BridgeName), + Type = bin(BridgeType), + <>. + +parse_bridge_id(BridgeId) -> + case string:split(bin(BridgeId), ":", all) of + [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; + _ -> error({invalid_bridge_id, BridgeId}) + end. + +reset_metrics(ResourceId) -> + emqx_resource:reset_metrics(ResourceId). + +stop(Type, Name) -> + emqx_resource:stop(resource_id(Type, Name)). + +%% we don't provide 'start', as we want an already started bridge to be restarted. +restart(Type, Name) -> + emqx_resource:restart(resource_id(Type, Name)). + +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + +create(Type, Name, Conf) -> + ?SLOG(info, #{ + msg => "create bridge", + type => Type, + name => Name, + config => Conf + }), + {ok, _Data} = emqx_resource:create_local( + resource_id(Type, Name), + <<"emqx_bridge">>, + bridge_to_resource_type(Type), + parse_confs(Type, Name, Conf), + #{} + ), + maybe_disable_bridge(Type, Name, Conf). + +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + +update(Type, Name, {OldConf, Conf}) -> + %% TODO: sometimes its not necessary to restart the bridge connection. + %% + %% - if the connection related configs like `servers` is updated, we should restart/start + %% or stop bridges according to the change. + %% - if the connection related configs are not update, only non-connection configs like + %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated + %% without restarting the bridge. + %% + case if_only_to_toggle_enable(OldConf, Conf) of + false -> + ?SLOG(info, #{ + msg => "update bridge", + type => Type, + name => Name, + config => Conf + }), + case recreate(Type, Name, Conf) of + {ok, _} -> + maybe_disable_bridge(Type, Name, Conf); + {error, not_found} -> + ?SLOG(warning, #{ + msg => "updating_a_non-exist_bridge_need_create_a_new_one", + type => Type, + name => Name, + config => Conf + }), + create(Type, Name, Conf); + {error, Reason} -> + {error, {update_bridge_failed, Reason}} + end; + true -> + %% we don't need to recreate the bridge if this config change is only to + %% toggole the config 'bridge.{type}.{name}.enable' + case maps:get(enable, Conf, true) of + true -> restart(Type, Name); + false -> stop(Type, Name) + end + end. + +recreate(Type, Name) -> + recreate(Type, Name, emqx:get_config([bridges, Type, Name])). + +recreate(Type, Name, Conf) -> + emqx_resource:recreate_local( + resource_id(Type, Name), + bridge_to_resource_type(Type), + parse_confs(Type, Name, Conf), + #{} + ). + +create_dry_run(Type, Conf) -> + Conf0 = fill_dry_run_conf(Conf), + case emqx_resource:check_config(bridge_to_resource_type(Type), Conf0) of + {ok, Conf1} -> + TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]), + case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of + {error, Reason} -> + {error, Reason}; + {ok, ConfNew} -> + Res = emqx_resource:create_dry_run_local( + bridge_to_resource_type(Type), ConfNew + ), + _ = maybe_clear_certs(TmpPath, ConfNew), + Res + end; + {error, _} = Error -> + Error + end. + +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + +remove(Type, Name) -> + remove(Type, Name, undefined). + +%% just for perform_bridge_changes/1 +remove(Type, Name, _Conf) -> + ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), + case emqx_resource:remove_local(resource_id(Type, Name)) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> {error, Reason} + end. + +maybe_disable_bridge(Type, Name, Conf) -> + case maps:get(enable, Conf, true) of + false -> stop(Type, Name); + true -> ok + end. + +if_only_to_toggle_enable(OldConf, Conf) -> + #{added := Added, removed := Removed, changed := Updated} = + emqx_map_lib:diff_maps(OldConf, Conf), + case {Added, Removed, Updated} of + {Added, Removed, #{enable := _} = Updated} when + map_size(Added) =:= 0, + map_size(Removed) =:= 0, + map_size(Updated) =:= 1 + -> + true; + {_, _, _} -> + false + end. + +fill_dry_run_conf(Conf) -> + Conf#{ + <<"egress">> => + #{ + <<"remote_topic">> => <<"t">>, + <<"remote_qos">> => 0, + <<"retain">> => true, + <<"payload">> => <<"val">> + }, + <<"ingress">> => + #{<<"remote_topic">> => <<"t">>} + }. + +maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) -> + %% don't remove the cert files if they are in use + case is_tmp_path_conf(TmpPath, SslConf) of + true -> emqx_connector_ssl:clear_certs(TmpPath, Conf); + false -> ok + end. + +is_tmp_path_conf(TmpPath, #{certfile := Certfile}) -> + is_tmp_path(TmpPath, Certfile); +is_tmp_path_conf(TmpPath, #{keyfile := Keyfile}) -> + is_tmp_path(TmpPath, Keyfile); +is_tmp_path_conf(TmpPath, #{cacertfile := CaCertfile}) -> + is_tmp_path(TmpPath, CaCertfile); +is_tmp_path_conf(_TmpPath, _Conf) -> + false. + +is_tmp_path(TmpPath, File) -> + string:str(str(File), str(TmpPath)) > 0. + +parse_confs( + http, + _Name, + #{ + url := Url, + method := Method, + body := Body, + headers := Headers, + request_timeout := ReqTimeout + } = Conf +) -> + {BaseUrl, Path} = parse_url(Url), + {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), + Conf#{ + base_url => BaseUrl2, + request => + #{ + path => Path, + method => Method, + body => Body, + headers => Headers, + request_timeout => ReqTimeout + } + }; +parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when + is_binary(ConnId) +-> + case emqx_connector:parse_connector_id(ConnId) of + {Type, ConnName} -> + ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), + make_resource_confs( + Direction, + ConnectorConfs, + maps:without([connector, direction], Conf), + Type, + Name + ); + {_ConnType, _ConnName} -> + error({cannot_use_connector_with_different_type, ConnId}) + end; +parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when + is_map(ConnectorConfs) +-> + make_resource_confs( + Direction, + ConnectorConfs, + maps:without([connector, direction], Conf), + Type, + Name + ). + +make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> + BName = bridge_id(Type, Name), + ConnectorConfs#{ + ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} + }; +make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> + ConnectorConfs#{ + egress => BridgeConf + }. + +parse_url(Url) -> + case string:split(Url, "//", leading) of + [Scheme, UrlRem] -> + case string:split(UrlRem, "/", leading) of + [HostPort, Path] -> + {iolist_to_binary([Scheme, "//", HostPort]), Path}; + [HostPort] -> + {iolist_to_binary([Scheme, "//", HostPort]), <<>>} + end; + [Url] -> + error({invalid_url, Url}) + end. + +str(Bin) when is_binary(Bin) -> binary_to_list(Bin); +str(Str) when is_list(Str) -> Str. + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index a35af0c7d..576d2247f 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -47,7 +47,7 @@ list_bridges(Node) -> restart_bridge_to_node(Node, BridgeType, BridgeName) -> rpc:call( Node, - emqx_bridge, + emqx_bridge_resource, restart, [BridgeType, BridgeName], ?TIMEOUT @@ -58,7 +58,7 @@ restart_bridge_to_node(Node, BridgeType, BridgeName) -> stop_bridge_to_node(Node, BridgeType, BridgeName) -> rpc:call( Node, - emqx_bridge, + emqx_bridge_resource, stop, [BridgeType, BridgeName], ?TIMEOUT @@ -69,7 +69,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) -> restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> erpc:multicall( Nodes, - emqx_bridge, + emqx_bridge_resource, restart, [BridgeType, BridgeName], ?TIMEOUT @@ -80,7 +80,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> erpc:multicall( Nodes, - emqx_bridge, + emqx_bridge_resource, stop, [BridgeType, BridgeName], ?TIMEOUT diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 827f836cc..75fa71414 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -26,6 +26,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + _ = application:load(emqx_conf), %% to avoid inter-suite dependencies application:stop(emqx_connector), ok = emqx_common_test_helpers:start_apps([emqx, emqx_bridge]), @@ -40,15 +41,17 @@ end_per_suite(_Config) -> ]). init_per_testcase(t_get_basic_usage_info_1, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), setup_fake_telemetry_data(), Config; init_per_testcase(_TestCase, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(t_get_basic_usage_info_1, _Config) -> lists:foreach( fun({BridgeType, BridgeName}) -> - ok = emqx_bridge:remove(BridgeType, BridgeName) + {ok, _} = emqx_bridge:remove(BridgeType, BridgeName) end, [ {http, <<"basic_usage_info_http">>}, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 3eceb853c..2c5c318d3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -86,7 +86,7 @@ end_per_testcase(_, _Config) -> clear_resources() -> lists:foreach( fun(#{type := Type, name := Name}) -> - ok = emqx_bridge:remove(Type, Name) + {ok, _} = emqx_bridge:remove(Type, Name) end, emqx_bridge:list() ). @@ -179,7 +179,7 @@ t_http_crud_apis(_) -> <<"url">> := URL1 } = jsx:decode(Bridge), - BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), @@ -316,7 +316,7 @@ do_start_stop_bridges(Type) -> <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), - BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% stop it {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), @@ -361,7 +361,7 @@ t_enable_disable_bridges(_) -> <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), - BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% disable it {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), @@ -410,7 +410,7 @@ t_reset_bridges(_) -> <<"node_metrics">> := [_ | _], <<"url">> := URL1 } = jsx:decode(Bridge), - BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []), %% delete the bridge diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 344750c4a..d85959698 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -48,7 +48,7 @@ post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppE ConnId = connector_id(Type, Name), try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) -> - throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)}) + throw({dependency_bridges_exist, emqx_bridge_resource:bridge_id(BType, BName)}) end), _ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf) catch @@ -61,7 +61,7 @@ post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) - fun(#{type := BType, name := BName}) -> BridgeConf = emqx:get_config([bridges, BType, BName]), case - emqx_bridge:update( + emqx_bridge_resource:update( BType, BName, {BridgeConf#{connector => OldConf}, BridgeConf#{connector => NewConf}} @@ -123,7 +123,7 @@ lookup_raw(Type, Name) -> -spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) -> ok | {error, Reason :: term()}. create_dry_run(Type, Conf) -> - emqx_bridge:create_dry_run(Type, Conf). + emqx_bridge_resource:create_dry_run(Type, Conf). update(Id, Conf) when is_binary(Id) -> {Type, Name} = parse_connector_id(Id), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index cd065fa1e..82d627212 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -136,13 +136,13 @@ clear_resources() -> ), lists:foreach( fun(#{type := Type, name := Name}) -> - ok = emqx_bridge:remove(Type, Name) + {ok, _} = emqx_bridge:remove(Type, Name) end, emqx_bridge:list() ), lists:foreach( fun(#{<<"type">> := Type, <<"name">> := Name}) -> - ok = emqx_connector:delete(Type, Name) + {ok, _} = emqx_connector:delete(Type, Name) end, emqx_connector:list_raw() ). @@ -296,7 +296,7 @@ t_mqtt_conn_bridge_ingress(_) -> <<"name">> := ?BRIDGE_NAME_INGRESS, <<"connector">> := ConnctorID } = jsx:decode(Bridge), - BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), wait_for_resource_ready(BridgeIDIngress, 5), %% we now test if the bridge works as expected @@ -371,7 +371,7 @@ t_mqtt_conn_bridge_egress(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS, <<"connector">> := ConnctorID } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), wait_for_resource_ready(BridgeIDEgress, 5), %% we now test if the bridge works as expected @@ -450,7 +450,7 @@ t_mqtt_conn_update(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS, <<"connector">> := ConnctorID } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), wait_for_resource_ready(BridgeIDEgress, 5), %% Then we try to update 'server' of the connector, to an unavailable IP address @@ -505,7 +505,7 @@ t_mqtt_conn_update2(_) -> <<"status">> := <<"disconnected">>, <<"connector">> := ConnctorID } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), %% We try to fix the 'server' parameter, to another unavailable server.. %% The update should success: we don't check the connectivity of the new config %% if the resource is now disconnected. @@ -553,7 +553,7 @@ t_mqtt_conn_update3(_) -> } ), #{<<"connector">> := ConnctorID} = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), wait_for_resource_ready(BridgeIDEgress, 5), %% delete the connector should fail because it is in use by a bridge @@ -602,7 +602,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> <<"name">> => ?BRIDGE_NAME_INGRESS } ), - BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), {ok, 201, Rule} = request( post, @@ -701,7 +701,7 @@ t_egress_mqtt_bridge_with_rules(_) -> } ), #{<<"type">> := ?CONNECTR_TYPE, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), {ok, 201, Rule} = request( post, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 4fb2a7ab7..fc522a59d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -288,7 +288,7 @@ get_basic_usage_info() -> tally_referenced_bridges(BridgeIDs, Acc0) -> lists:foldl( fun(BridgeID, Acc) -> - {BridgeType, _BridgeName} = emqx_bridge:parse_bridge_id(BridgeID), + {BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeID), maps:update_with( BridgeType, fun(X) -> X + 1 end,