%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge). -behaviour(emqx_config_handler). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ post_config_update/5 ]). -export([ load_hook/0 , unload_hook/0 ]). -export([on_message_publish/1]). -export([ resource_type/1 , bridge_type/1 , resource_id/1 , resource_id/2 , bridge_id/2 , parse_bridge_id/1 ]). -export([ load/0 , lookup/1 , lookup/2 , lookup/3 , list/0 , list_bridges_by_connector/1 , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 , remove/1 , remove/2 , update/2 , update/3 , stop/2 , restart/2 , reset_metrics/1 ]). -export([ send_message/2 ]). -export([ config_key_path/0 ]). %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). load_hook(Bridges) -> lists:foreach(fun({_Type, Bridge}) -> lists:foreach(fun({_Name, BridgeConf}) -> do_load_hook(BridgeConf) end, maps:to_list(Bridge)) end, maps:to_list(Bridges)). do_load_hook(#{local_topic := _} = Conf) -> case maps:get(direction, Conf, egress) of egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); ingress -> ok end; do_load_hook(_Conf) -> ok. unload_hook() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> Msg = emqx_rule_events:eventmsg_publish(Message), send_to_matched_egress_bridges(Topic, Msg); true -> ok end, {ok, Message}. send_to_matched_egress_bridges(Topic, Msg) -> lists:foreach(fun (Id) -> try send_message(Id, Msg) of {error, Reason} -> ?SLOG(error, #{msg => "send_message_to_bridge_failed", bridge => Id, error => Reason}); _ -> ok catch Err:Reason:ST -> ?SLOG(error, #{msg => "send_message_to_bridge_exception", bridge => Id, error => Err, reason => Reason, stacktrace => ST}) end end, get_matched_bridges(Topic)). send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> {error, {bridge_not_found, BridgeId}}; #{enable := true} -> emqx_resource:query(ResId, {send_message, Message}); #{enable := false} -> {error, {bridge_stopped, BridgeId}} end. config_key_path() -> [bridges]. resource_type(<<"mqtt">>) -> emqx_connector_mqtt; resource_type(mqtt) -> emqx_connector_mqtt; resource_type(<<"http">>) -> emqx_connector_http; resource_type(http) -> emqx_connector_http. bridge_type(emqx_connector_mqtt) -> mqtt; bridge_type(emqx_connector_http) -> http. post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ {fun remove/3, Removed}, {fun create/3, Added}, {fun update/3, Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), Result. perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks, ok). perform_bridge_changes([], Result) -> Result; perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> Result = maps:fold(fun ({_Type, _Name}, _Conf, {error, Reason}) -> {error, Reason}; ({Type, Name}, Conf, _) -> case Action(Type, Name, Conf) of {error, Reason} -> {error, Reason}; Return -> Return end end, Result0, MapConfs), perform_bridge_changes(Tasks, Result). load() -> Bridges = emqx:get_config([bridges], #{}), emqx_bridge_monitor:ensure_all_started(Bridges). resource_id(BridgeId) when is_binary(BridgeId) -> <<"bridge:", BridgeId/binary>>. resource_id(BridgeType, BridgeName) -> BridgeId = bridge_id(BridgeType, BridgeName), resource_id(BridgeId). bridge_id(BridgeType, BridgeName) -> Name = bin(BridgeName), Type = bin(BridgeType), <>. parse_bridge_id(BridgeId) -> case string:split(bin(BridgeId), ":", all) of [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; _ -> error({invalid_bridge_id, BridgeId}) end. list() -> lists:foldl(fun({Type, NameAndConf}, Bridges) -> lists:foldl(fun({Name, RawConf}, Acc) -> case lookup(Type, Name, RawConf) of {error, not_found} -> Acc; {ok, Res} -> [Res | Acc] end end, Bridges, maps:to_list(NameAndConf)) end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))). list_bridges_by_connector(ConnectorId) -> [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), ConnectorId =:= Id]. lookup(Id) -> {Type, Name} = parse_bridge_id(Id), lookup(Type, Name). lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). lookup(Type, Name, RawConf) -> case emqx_resource:get_instance(resource_id(Type, Name)) of {error, not_found} -> {error, not_found}; {ok, _, Data} -> {ok, #{type => Type, name => Name, resource_data => Data, raw_config => RawConf}} end. reset_metrics(ResourceId) -> emqx_resource:reset_metrics(ResourceId). stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). %% we don't provide 'start', as we want an already started bridge to be restarted. restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). create(BridgeId, Conf) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), create(BridgeType, BridgeName, Conf). create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} end. update(BridgeId, {OldConf, Conf}) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), update(BridgeType, BridgeName, {OldConf, Conf}). update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start %% or stop bridges according to the change. %% - if the connection related configs are not update, only non-connection configs like %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% without restarting the bridge. %% case if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, config => Conf}), case recreate(Type, Name, Conf) of {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, not_found} -> ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); {error, Reason} -> {error, {update_bridge_failed, Reason}} end; true -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' case maps:get(enable, Conf, true) of true -> restart(Type, Name); false -> stop(Type, Name) end end. recreate(Type, Name) -> recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"egress">> => #{ <<"remote_topic">> => <<"t">> , <<"remote_qos">> => 0 , <<"retain">> => true , <<"payload">> => <<"val">> }, <<"ingress">> => #{ <<"remote_topic">> => <<"t">> }}, case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of {ok, Conf1} -> emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1); {error, _} = Error -> Error end. remove(BridgeId) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), remove(BridgeType, BridgeName, #{}). remove(Type, Name) -> remove(Type, Name, undefined). %% just for perform_bridge_changes/1 remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of ok -> ok; {error, not_found} -> ok; {error, Reason} -> {error, Reason} end. diff_confs(NewConfs, OldConfs) -> emqx_map_lib:diff_maps(flatten_confs(NewConfs), flatten_confs(OldConfs)). flatten_confs(Conf0) -> maps:from_list( lists:flatmap(fun({Type, Conf}) -> do_flatten_confs(Type, Conf) end, maps:to_list(Conf0))). do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. get_matched_bridges(Topic) -> Bridges = emqx:get_config([bridges], #{}), maps:fold(fun (BType, Conf, Acc0) -> maps:fold(fun %% Confs for MQTT, Kafka bridges have the `direction` flag (_BName, #{direction := ingress}, Acc1) -> Acc1; (BName, #{direction := egress} = Egress, Acc1) -> %% HTTP, MySQL bridges only have egress direction get_matched_bridge_id(Egress, Topic, BType, BName, Acc1) end, Acc0, Conf) end, [], Bridges). get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [bridge_id(BType, BName) | Acc]; false -> Acc end. parse_confs(http, _Name, #{ url := Url , method := Method , body := Body , headers := Headers , request_timeout := ReqTimeout } = Conf) -> {BaseUrl, Path} = parse_url(Url), {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), Conf#{ base_url => BaseUrl2 , request => #{ path => Path , method => Method , body => Body , headers => Headers , request_timeout => ReqTimeout } }; parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when is_binary(ConnId) -> case emqx_connector:parse_connector_id(ConnId) of {Type, ConnName} -> ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), make_resource_confs(Direction, ConnectorConfs, maps:without([connector, direction], Conf), Type, Name); {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs(Direction, ConnectorConfs, maps:without([connector, direction], Conf), Type, Name). make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> BName = bridge_id(Type, Name), ConnectorConfs#{ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} }; make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> ConnectorConfs#{ egress => BridgeConf }. parse_url(Url) -> case string:split(Url, "//", leading) of [Scheme, UrlRem] -> case string:split(UrlRem, "/", leading) of [HostPort, Path] -> {iolist_to_binary([Scheme, "//", HostPort]), Path}; [HostPort] -> {iolist_to_binary([Scheme, "//", HostPort]), <<>>} end; [Url] -> error({invalid_url, Url}) end. maybe_disable_bridge(Type, Name, Conf) -> case maps:get(enable, Conf, true) of false -> stop(Type, Name); true -> ok end. if_only_to_toggle_enable(OldConf, Conf) -> #{added := Added, removed := Removed, changed := Updated} = emqx_map_lib:diff_maps(OldConf, Conf), case {Added, Removed, Updated} of {Added, Removed, #{enable := _}= Updated} when map_size(Added) =:= 0, map_size(Removed) =:= 0, map_size(Updated) =:= 1 -> true; {_, _, _} -> false end. -spec get_basic_usage_info() -> #{ num_bridges => non_neg_integer() , count_by_type => #{ BridgeType => non_neg_integer() } } when BridgeType :: atom(). get_basic_usage_info() -> InitialAcc = #{num_bridges => 0, count_by_type => #{}}, try lists:foldl( fun(#{resource_data := #{config := #{enable := false}}}, Acc) -> Acc; (#{type := BridgeType}, Acc) -> NumBridges = maps:get(num_bridges, Acc), CountByType0 = maps:get(count_by_type, Acc), CountByType = maps:update_with( binary_to_atom(BridgeType, utf8), fun(X) -> X + 1 end, 1, CountByType0), Acc#{ num_bridges => NumBridges + 1 , count_by_type => CountByType } end, InitialAcc, list()) catch %% for instance, when the bridge app is not ready yet. _:_ -> InitialAcc end. bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).