From cb8dabe579d9f6b888cced4c48ee345aa63e5630 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 14 Sep 2021 17:32:26 +0800 Subject: [PATCH] feat(bridges): add CRUD HTTP APIs for bridges --- apps/emqx/rebar.config | 4 +- apps/emqx_bridge/etc/emqx_bridge.conf | 90 +++--- apps/emqx_bridge/src/emqx_bridge.erl | 170 +++++++++-- apps/emqx_bridge/src/emqx_bridge_api.erl | 298 +++++++++++-------- apps/emqx_bridge/src/emqx_bridge_app.erl | 20 +- apps/emqx_bridge/src/emqx_bridge_monitor.erl | 2 +- rebar.config | 6 +- 7 files changed, 382 insertions(+), 208 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index a8462ad82..119d521fc 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -11,11 +11,11 @@ {deps, [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} - , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} + , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.1"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index bfba34e7c..c04abf82a 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -2,48 +2,48 @@ ## EMQ X Bridge ##-------------------------------------------------------------------- -#bridges.mqtt.my_mqtt_bridge_to_aws { -# server = "127.0.0.1:1883" -# proto_ver = "v4" -# username = "username1" -# password = "" -# clean_start = true -# keepalive = 300 -# retry_interval = "30s" -# max_inflight = 32 -# reconnect_interval = "30s" -# bridge_mode = true -# replayq { -# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" -# seg_bytes = "100MB" -# offload = false -# max_total_bytes = "1GB" -# } -# ssl { -# enable = false -# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" -# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" -# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" -# } -# ## we will create one MQTT connection for each element of the `message_in` -# message_in: [{ -# ## the `id` will be used as part of the clientid -# id = "pull_msgs_from_aws" -# subscribe_remote_topic = "aws/#" -# subscribe_qos = 1 -# local_topic = "from_aws/${topic}" -# payload = "${payload}" -# qos = "${qos}" -# retain = "${retain}" -# }] -# ## we will create one MQTT connection for each element of the `message_out` -# message_out: [{ -# ## the `id` will be used as part of the clientid -# id = "push_msgs_to_aws" -# subscribe_local_topic = "emqx/#" -# remote_topic = "from_emqx/${topic}" -# payload = "${payload}" -# qos = 1 -# retain = false -# }] -#} +bridges.mqtt.my_mqtt_bridge_to_aws { + server = "127.0.0.1:1883" + proto_ver = "v4" + username = "username1" + password = "" + clean_start = true + keepalive = 300 + retry_interval = "30s" + max_inflight = 32 + reconnect_interval = "30s" + bridge_mode = true + replayq { + dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" + seg_bytes = "100MB" + offload = false + max_total_bytes = "1GB" + } + ssl { + enable = false + keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" + certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" + cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + } + ## we will create one MQTT connection for each element of the `message_in` + message_in: [{ + ## the `id` will be used as part of the clientid + id = "pull_msgs_from_aws" + subscribe_remote_topic = "aws/#" + subscribe_qos = 1 + local_topic = "from_aws/${topic}" + payload = "${payload}" + qos = "${qos}" + retain = "${retain}" + }] + ## we will create one MQTT connection for each element of the `message_out` + message_out: [{ + ## the `id` will be used as part of the clientid + id = "push_msgs_to_aws" + subscribe_local_topic = "emqx/#" + remote_topic = "from_emqx/${topic}" + payload = "${payload}" + qos = 1 + retain = false + }] +} diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 75ebfac0c..4d2b80ba7 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -14,21 +14,34 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge). +-behaviour(emqx_config_handler). + +-export([post_config_update/4]). -export([ load_bridges/0 - , resource_type/1 - , bridge_type/1 - , name_to_resource_id/1 - , resource_id_to_name/1 + , get_bridge/2 + , get_bridge/3 , list_bridges/0 - , is_bridge/1 - , config_key_path/0 - , update_config/1 + , create_bridge/3 + , remove_bridge/3 + , update_bridge/3 + , start_bridge/2 + , stop_bridge/2 + , restart_bridge/2 ]). -load_bridges() -> - Bridges = emqx:get_config([bridges], #{}), - emqx_bridge_monitor:ensure_all_started(Bridges). +-export([ config_key_path/0 + ]). + +-export([ resource_type/1 + , bridge_type/1 + , resource_id/1 + , resource_id/2 + , parse_bridge_id/1 + ]). + +config_key_path() -> + [bridges]. resource_type(mqtt) -> emqx_connector_mqtt; resource_type(mysql) -> emqx_connector_mysql; @@ -44,27 +57,136 @@ bridge_type(emqx_connector_mongo) -> mongo; bridge_type(emqx_connector_redis) -> redis; bridge_type(emqx_connector_ldap) -> ldap. -name_to_resource_id(BridgeName) -> - Name = bin(BridgeName), - <<"bridge:", Name/binary>>. +post_config_update(_Req, NewConf, OldConf, _AppEnv) -> + #{added := Added, removed := Removed, changed := Updated} + = diff_confs(NewConf, OldConf), + perform_bridge_changes([ + {fun remove_bridge/3, Removed}, + {fun create_bridge/3, Added}, + {fun update_bridge/3, Updated} + ]). -resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) -> - BridgeName. +perform_bridge_changes(Tasks) -> + perform_bridge_changes(Tasks, ok). + +perform_bridge_changes([], Result) -> + Result; +perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> + Result = maps:fold(fun + ({_Type, _Name}, _Conf, {error, Reason}) -> + {error, Reason}; + ({Type, Name}, Conf, _) -> + case Action(Type, Name, Conf) of + {error, Reason} -> {error, Reason}; + Return -> Return + end + end, Result0, MapConfs), + perform_bridge_changes(Tasks, Result). + +load_bridges() -> + Bridges = emqx:get_config([bridges], #{}), + emqx_bridge_monitor:ensure_all_started(Bridges). + +resource_id(BridgeId) when is_binary(BridgeId) -> + <<"bridge:", BridgeId/binary>>. + +resource_id(BridgeType, BridgeName) -> + BridgeId = bridge_id(BridgeType, BridgeName), + resource_id(BridgeId). + +bridge_id(BridgeType, BridgeName) -> + Name = bin(BridgeName), + Type = bin(BridgeType), + <>. + +parse_bridge_id(BridgeId) -> + try + [Type, Name] = string:split(str(BridgeId), ":", leading), + {list_to_existing_atom(Type), list_to_atom(Name)} + catch + _ : _ -> error({invalid_bridge_id, BridgeId}) + end. list_bridges() -> - emqx_resource_api:list_instances(fun emqx_bridge:is_bridge/1). + lists:foldl(fun({Type, NameAndConf}, Bridges) -> + lists:foldl(fun({Name, RawConf}, Acc) -> + case get_bridge(Type, Name, RawConf) of + {error, not_found} -> Acc; + {ok, Res} -> [Res | Acc] + end + end, Bridges, maps:to_list(NameAndConf)) + end, [], maps:to_list(emqx:get_raw_config([bridges]))). -is_bridge(#{id := <<"bridge:", _/binary>>}) -> - true; -is_bridge(_Data) -> - false. +get_bridge(Type, Name) -> + RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), + get_bridge(Type, Name, RawConf). +get_bridge(Type, Name, RawConf) -> + case emqx_resource:get_instance(resource_id(Type, Name)) of + {error, not_found} -> {error, not_found}; + {ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data, + raw_config => RawConf}} + end. -config_key_path() -> - [emqx_bridge, bridges]. +start_bridge(Type, Name) -> + restart_bridge(Type, Name). -update_config(ConfigReq) -> - emqx:update_config(config_key_path(), ConfigReq). +stop_bridge(Type, Name) -> + emqx_resource:stop(resource_id(Type, Name)). + +restart_bridge(Type, Name) -> + emqx_resource:restart(resource_id(Type, Name)). + +create_bridge(Type, Name, Conf) -> + ResId = resource_id(Type, Name), + case emqx_resource:create(ResId, + emqx_bridge:resource_type(Type), Conf) of + {ok, already_created} -> + emqx_resource:get_instance(ResId); + {ok, Data} -> + {ok, Data}; + {error, Reason} -> + {error, Reason} + end. + +update_bridge(Type, Name, Conf) -> + %% TODO: sometimes its not necessary to restart the bridge connection. + %% + %% - if the connection related configs like `username` is updated, we should restart/start + %% or stop bridges according to the change. + %% - if the connection related configs are not update, but channel configs `message_in` or + %% `message_out` are changed, then we should not restart the bridge, we only restart/start + %% the channels. + %% + emqx_resource:recreate(resource_id(Type, Name), + emqx_bridge:resource_type(Type), Conf). + +remove_bridge(Type, Name, _Conf) -> + case emqx_resource:remove(resource_id(Type, Name)) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> + {error, Reason} + end. + +diff_confs(NewConfs, OldConfs) -> + emqx_map_lib:diff_maps(flatten_confs(NewConfs), + flatten_confs(OldConfs)). + +flatten_confs(Conf0) -> + maps:from_list( + lists:append([do_flatten_confs(Type, Conf) + || {Type, Conf} <- maps:to_list(Conf0)])). + +do_flatten_confs(Type, Conf0) -> + [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index c10875e55..56c421e0c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -15,128 +15,194 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_api). --rest_api(#{ name => list_data_bridges - , method => 'GET' - , path => "/data_bridges" - , func => list_bridges - , descr => "List all data bridges" - }). +-behaviour(minirest_api). --rest_api(#{ name => get_data_bridge - , method => 'GET' - , path => "/data_bridges/:bin:name" - , func => get_bridge - , descr => "Get a data bridge by name" - }). - --rest_api(#{ name => create_data_bridge - , method => 'POST' - , path => "/data_bridges/:bin:name" - , func => create_bridge - , descr => "Create a new data bridge" - }). - --rest_api(#{ name => update_data_bridge - , method => 'PUT' - , path => "/data_bridges/:bin:name" - , func => update_bridge - , descr => "Update an existing data bridge" - }). - --rest_api(#{ name => delete_data_bridge - , method => 'DELETE' - , path => "/data_bridges/:bin:name" - , func => delete_bridge - , descr => "Delete an existing data bridge" - }). +-export([api_spec/0]). -export([ list_bridges/2 - , get_bridge/2 - , create_bridge/2 - , update_bridge/2 - , delete_bridge/2 + , list_local_bridges/1 + , crud_bridges_cluster/2 + , crud_bridges/3 ]). --define(BRIDGE(N, T, C), #{<<"name">> => N, <<"type">> => T, <<"config">> => C}). +-define(TYPES, [mqtt]). +-define(BRIDGE(N, T, C), #{<<"id">> => N, <<"type">> => T, <<"config">> => C}). +-define(TRY_PARSE_ID(ID, EXPR), + try emqx_bridge:parse_bridge_id(Id) of + {BridgeType, BridgeName} -> EXPR + catch + error:{invalid_bridge_id, Id0} -> + {400, #{code => 102, message => <<"invalid_bridge_id: ", Id0/binary>>}} + end). -list_bridges(_Binding, _Params) -> - {200, #{code => 0, data => [format_api_reply(Data) || - Data <- emqx_bridge:list_bridges()]}}. +req_schema() -> + Schema = [ + case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of + %% the bridge is not configured, so we have no method to get the schema + [] -> #{}; + [{_K, Conf} | _] -> + emqx_mgmt_api_configs:gen_schema(Conf) + end + || T <- ?TYPES], + #{oneOf => Schema}. -get_bridge(#{name := Name}, _Params) -> - case emqx_resource:get_instance(emqx_bridge:name_to_resource_id(Name)) of - {ok, Data} -> - {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; +resp_schema() -> + #{oneOf := Schema} = req_schema(), + AddMetadata = fun(Prop) -> + Prop#{is_connected => #{type => boolean}, + id => #{type => string}, + bridge_type => #{type => string, enum => ?TYPES}, + node => #{type => string}} + end, + Schema1 = [S#{properties => AddMetadata(Prop)} + || S = #{properties := Prop} <- Schema], + #{oneOf => Schema1}. + +api_spec() -> + {bridge_apis(), []}. + +bridge_apis() -> + [list_all_bridges_api(), crud_bridges_apis(), operation_apis()]. + +list_all_bridges_api() -> + Metadata = #{ + get => #{ + description => <<"List all created bridges">>, + responses => #{ + <<"200">> => emqx_mgmt_util:array_schema(resp_schema(), + <<"A list of the bridges">>) + } + } + }, + {"/bridges/", Metadata, list_bridges}. + +crud_bridges_apis() -> + ReqSchema = req_schema(), + RespSchema = resp_schema(), + Metadata = #{ + get => #{ + description => <<"Get a bridge by Id">>, + parameters => [param_path_id()], + responses => #{ + <<"200">> => emqx_mgmt_util:array_schema(RespSchema, + <<"The details of the bridge">>), + <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + } + }, + put => #{ + description => <<"Create or update a bridge">>, + parameters => [param_path_id()], + 'requestBody' => emqx_mgmt_util:schema(ReqSchema), + responses => #{ + <<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>), + <<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>, + ['UPDATE_FAILED']) + } + }, + delete => #{ + description => <<"Delete a bridge">>, + parameters => [param_path_id()], + responses => #{ + <<"200">> => emqx_mgmt_util:schema(<<"Bridge deleted">>), + <<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND']) + } + } + }, + {"/bridges/:id", Metadata, crud_bridges_cluster}. + +operation_apis() -> + Metadata = #{ + post => #{ + description => <<"Restart bridges on all nodes in the cluster">>, + parameters => [ + param_path_id(), + param_path_operation()], + responses => #{ + <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, + {"/bridges/:id/operation/:operation", Metadata, manage_bridges}. + +param_path_id() -> + #{ + name => id, + in => path, + schema => #{type => string}, + required => true + }. + +param_path_operation()-> + #{ + name => operation, + in => path, + required => true, + schema => #{ + type => string, + enum => [start, stop, restart]}, + example => restart + }. + +list_bridges(get, _Params) -> + {200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}. + +list_local_bridges(Node) when Node =:= node() -> + [format_resp(Data) || Data <- emqx_bridge:list_bridges()]; +list_local_bridges(Node) -> + rpc_call(Node, list_local_bridges, [Node]). + +crud_bridges_cluster(Method, Params) -> + Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()], + case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of + [] -> + case Results of + [{200} | _] -> {200}; + _ -> {200, [Res || {200, Res} <- Results]} + end; + Errors -> + hd(Errors) + end. + +crud_bridges(Node, Method, Params) when Node =/= node() -> + rpc_call(Node, crud_bridges, [Node, Method, Params]); + +crud_bridges(_, get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, case emqx_bridge:get_bridge(BridgeType, BridgeName) of + {ok, Data} -> {200, format_resp(Data)}; {error, not_found} -> - {404, #{code => 102, message => <<"not_found: ", Name/binary>>}} + {404, #{code => 102, message => <<"not_found: ", Id/binary>>}} + end); + +crud_bridges(_, put, #{bindings := #{id := Id}, body := Conf}) -> + ?TRY_PARSE_ID(Id, + case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf, + #{rawconf_with_defaults => true}) of + {ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} -> + {200, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})}; + {ok, _} -> %% the bridge already exits + {ok, Data} = emqx_bridge:get_bridge(BridgeType, BridgeName), + {200, format_resp(Data)}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end); + +crud_bridges(_, delete, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, + case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of + {ok, _} -> {200}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end). + +format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) -> + IsConnected = fun(started) -> true; (_) -> false end, + RawConf#{ + id => Id, + node => node(), + bridge_type => emqx_bridge:bridge_type(Mod), + is_connected => IsConnected(Status) + }. + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res end. - -create_bridge(#{name := Name}, Params) -> - Config = proplists:get_value(<<"config">>, Params), - BridgeType = proplists:get_value(<<"type">>, Params), - case emqx_resource:check_and_create( - emqx_bridge:name_to_resource_id(Name), - emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of - {ok, already_created} -> - {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; - {ok, Data} -> - update_config_and_reply(Name, BridgeType, Config, Data); - {error, Reason0} -> - Reason = emqx_resource_api:stringnify(Reason0), - {500, #{code => 102, message => <<"create bridge ", Name/binary, - " failed:", Reason/binary>>}} - end. - -update_bridge(#{name := Name}, Params) -> - Config = proplists:get_value(<<"config">>, Params), - BridgeType = proplists:get_value(<<"type">>, Params), - case emqx_resource:check_and_update( - emqx_bridge:name_to_resource_id(Name), - emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of - {ok, Data} -> - update_config_and_reply(Name, BridgeType, Config, Data); - {error, not_found} -> - {400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}}; - {error, Reason0} -> - Reason = emqx_resource_api:stringnify(Reason0), - {500, #{code => 102, message => <<"update bridge ", Name/binary, - " failed:", Reason/binary>>}} - end. - -delete_bridge(#{name := Name}, _Params) -> - case emqx_resource:remove(emqx_bridge:name_to_resource_id(Name)) of - ok -> delete_config_and_reply(Name); - {error, Reason} -> - {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} - end. - -format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) -> - #{type => emqx_bridge:bridge_type(Type), - name => emqx_bridge:resource_id_to_name(Id), - config => Conf, status => Status}. - -% format_conf(#{resource_type := Type, id := Id, config := Conf}) -> -% #{type => Type, name => emqx_bridge:resource_id_to_name(Id), -% config => Conf}. - -% get_all_configs() -> -% [format_conf(Data) || Data <- emqx_bridge:list_bridges()]. - -update_config_and_reply(Name, BridgeType, Config, Data) -> - case emqx_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of - {ok, _} -> - {200, #{code => 0, data => format_api_reply( - emqx_resource_api:format_data(Data))}}; - {error, Reason} -> - {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} - end. - -delete_config_and_reply(Name) -> - case emqx_bridge:update_config({delete, Name}) of - {ok, _} -> {200, #{code => 0, data => #{}}}; - {error, Reason} -> - {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} - end. - -atom(B) when is_binary(B) -> - list_to_existing_atom(binary_to_list(B)). diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index cfefe118f..004b32787 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -17,29 +17,15 @@ -behaviour(application). --behaviour(emqx_config_handler). - --export([start/2, stop/1, pre_config_update/2]). +-export([start/2, stop/1]). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), ok = emqx_bridge:load_bridges(), - emqx_config_handler:add_handler(emqx_bridge:config_key_path(), ?MODULE), + emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge), {ok, Sup}. stop(_State) -> ok. -%% internal functions -pre_config_update({update, Bridge = #{<<"name">> := Name}}, OldConf) -> - {ok, [Bridge | remove_bridge(Name, OldConf)]}; -pre_config_update({delete, Name}, OldConf) -> - {ok, remove_bridge(Name, OldConf)}; -pre_config_update(NewConf, _OldConf) when is_list(NewConf) -> - %% overwrite the entire config! - {ok, NewConf}. - -remove_bridge(_Name, undefined) -> - []; -remove_bridge(Name, OldConf) -> - [B || B = #{<<"name">> := Name0} <- OldConf, Name0 =/= Name]. +%% internal functions \ No newline at end of file diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index d76af5fb9..3136a74c9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -75,7 +75,7 @@ load_bridges(Configs) -> %% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). load_bridge(Name, Type, Config) -> case emqx_resource:create_local( - emqx_bridge:name_to_resource_id(Name), + emqx_bridge:resource_id(Type, Name), emqx_bridge:resource_type(Type), Config) of {ok, already_created} -> ok; {ok, _} -> ok; diff --git a/rebar.config b/rebar.config index 7210f11b0..7c5edc680 100644 --- a/rebar.config +++ b/rebar.config @@ -47,11 +47,11 @@ , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} - , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} + , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.2"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.3"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} @@ -60,7 +60,7 @@ , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}