462 lines
16 KiB
Erlang
462 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_bridge).
|
|
-behaviour(emqx_config_handler).
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([ post_config_update/5
|
|
]).
|
|
|
|
-export([ load_hook/0
|
|
, unload_hook/0
|
|
]).
|
|
|
|
-export([on_message_publish/1]).
|
|
|
|
-export([ resource_type/1
|
|
, bridge_type/1
|
|
, resource_id/1
|
|
, resource_id/2
|
|
, bridge_id/2
|
|
, parse_bridge_id/1
|
|
]).
|
|
|
|
-export([ load/0
|
|
, lookup/1
|
|
, lookup/2
|
|
, lookup/3
|
|
, list/0
|
|
, list_bridges_by_connector/1
|
|
, create/2
|
|
, create/3
|
|
, recreate/2
|
|
, recreate/3
|
|
, create_dry_run/2
|
|
, remove/1
|
|
, remove/2
|
|
, update/2
|
|
, update/3
|
|
, stop/2
|
|
, restart/2
|
|
, reset_metrics/1
|
|
]).
|
|
|
|
-export([ send_message/2
|
|
]).
|
|
|
|
-export([ config_key_path/0
|
|
]).
|
|
|
|
%% exported for `emqx_telemetry'
|
|
-export([get_basic_usage_info/0]).
|
|
|
|
load_hook() ->
|
|
Bridges = emqx:get_config([bridges], #{}),
|
|
load_hook(Bridges).
|
|
|
|
load_hook(Bridges) ->
|
|
lists:foreach(fun({_Type, Bridge}) ->
|
|
lists:foreach(fun({_Name, BridgeConf}) ->
|
|
do_load_hook(BridgeConf)
|
|
end, maps:to_list(Bridge))
|
|
end, maps:to_list(Bridges)).
|
|
|
|
do_load_hook(#{local_topic := _} = Conf) ->
|
|
case maps:get(direction, Conf, egress) of
|
|
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
|
|
ingress -> ok
|
|
end;
|
|
do_load_hook(_Conf) -> ok.
|
|
|
|
unload_hook() ->
|
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
|
|
|
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
|
case maps:get(sys, Flags, false) of
|
|
false ->
|
|
Msg = emqx_rule_events:eventmsg_publish(Message),
|
|
send_to_matched_egress_bridges(Topic, Msg);
|
|
true -> ok
|
|
end,
|
|
{ok, Message}.
|
|
|
|
send_to_matched_egress_bridges(Topic, Msg) ->
|
|
lists:foreach(fun (Id) ->
|
|
try send_message(Id, Msg) of
|
|
{error, Reason} ->
|
|
?SLOG(error, #{msg => "send_message_to_bridge_failed",
|
|
bridge => Id, error => Reason});
|
|
_ -> ok
|
|
catch Err:Reason:ST ->
|
|
?SLOG(error, #{msg => "send_message_to_bridge_exception",
|
|
bridge => Id, error => Err, reason => Reason,
|
|
stacktrace => ST})
|
|
end
|
|
end, get_matched_bridges(Topic)).
|
|
|
|
send_message(BridgeId, Message) ->
|
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
|
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
|
not_found ->
|
|
{error, {bridge_not_found, BridgeId}};
|
|
#{enable := true} ->
|
|
emqx_resource:query(ResId, {send_message, Message});
|
|
#{enable := false} ->
|
|
{error, {bridge_stopped, BridgeId}}
|
|
end.
|
|
|
|
config_key_path() ->
|
|
[bridges].
|
|
|
|
resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
|
|
resource_type(mqtt) -> emqx_connector_mqtt;
|
|
resource_type(<<"http">>) -> emqx_connector_http;
|
|
resource_type(http) -> emqx_connector_http.
|
|
|
|
bridge_type(emqx_connector_mqtt) -> mqtt;
|
|
bridge_type(emqx_connector_http) -> http.
|
|
|
|
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
|
|
#{added := Added, removed := Removed, changed := Updated}
|
|
= diff_confs(NewConf, OldConf),
|
|
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
|
Result = perform_bridge_changes([
|
|
{fun remove/3, Removed},
|
|
{fun create/3, Added},
|
|
{fun update/3, Updated}
|
|
]),
|
|
ok = unload_hook(),
|
|
ok = load_hook(NewConf),
|
|
Result.
|
|
|
|
perform_bridge_changes(Tasks) ->
|
|
perform_bridge_changes(Tasks, ok).
|
|
|
|
perform_bridge_changes([], Result) ->
|
|
Result;
|
|
perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
|
|
Result = maps:fold(fun
|
|
({_Type, _Name}, _Conf, {error, Reason}) ->
|
|
{error, Reason};
|
|
({Type, Name}, Conf, _) ->
|
|
case Action(Type, Name, Conf) of
|
|
{error, Reason} -> {error, Reason};
|
|
Return -> Return
|
|
end
|
|
end, Result0, MapConfs),
|
|
perform_bridge_changes(Tasks, Result).
|
|
|
|
load() ->
|
|
Bridges = emqx:get_config([bridges], #{}),
|
|
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
|
|
resource_id(BridgeId) when is_binary(BridgeId) ->
|
|
<<"bridge:", BridgeId/binary>>.
|
|
|
|
resource_id(BridgeType, BridgeName) ->
|
|
BridgeId = bridge_id(BridgeType, BridgeName),
|
|
resource_id(BridgeId).
|
|
|
|
bridge_id(BridgeType, BridgeName) ->
|
|
Name = bin(BridgeName),
|
|
Type = bin(BridgeType),
|
|
<<Type/binary, ":", Name/binary>>.
|
|
|
|
parse_bridge_id(BridgeId) ->
|
|
case string:split(bin(BridgeId), ":", all) of
|
|
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
|
|
_ -> error({invalid_bridge_id, BridgeId})
|
|
end.
|
|
|
|
list() ->
|
|
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
|
|
lists:foldl(fun({Name, RawConf}, Acc) ->
|
|
case lookup(Type, Name, RawConf) of
|
|
{error, not_found} -> Acc;
|
|
{ok, Res} -> [Res | Acc]
|
|
end
|
|
end, Bridges, maps:to_list(NameAndConf))
|
|
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
|
|
|
|
list_bridges_by_connector(ConnectorId) ->
|
|
[B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
|
|
ConnectorId =:= Id].
|
|
|
|
lookup(Id) ->
|
|
{Type, Name} = parse_bridge_id(Id),
|
|
lookup(Type, Name).
|
|
|
|
lookup(Type, Name) ->
|
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
|
lookup(Type, Name, RawConf).
|
|
lookup(Type, Name, RawConf) ->
|
|
case emqx_resource:get_instance(resource_id(Type, Name)) of
|
|
{error, not_found} -> {error, not_found};
|
|
{ok, _, Data} ->
|
|
{ok, #{type => Type, name => Name, resource_data => Data,
|
|
raw_config => RawConf}}
|
|
end.
|
|
|
|
reset_metrics(ResourceId) ->
|
|
emqx_resource:reset_metrics(ResourceId).
|
|
|
|
stop(Type, Name) ->
|
|
emqx_resource:stop(resource_id(Type, Name)).
|
|
|
|
%% we don't provide 'start', as we want an already started bridge to be restarted.
|
|
restart(Type, Name) ->
|
|
emqx_resource:restart(resource_id(Type, Name)).
|
|
|
|
create(BridgeId, Conf) ->
|
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
create(BridgeType, BridgeName, Conf).
|
|
|
|
create(Type, Name, Conf) ->
|
|
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
|
config => Conf}),
|
|
case emqx_resource:create_local(resource_id(Type, Name),
|
|
<<"emqx_bridge">>,
|
|
emqx_bridge:resource_type(Type),
|
|
parse_confs(Type, Name, Conf),
|
|
#{}) of
|
|
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
|
|
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
|
{error, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
update(BridgeId, {OldConf, Conf}) ->
|
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
update(BridgeType, BridgeName, {OldConf, Conf}).
|
|
|
|
update(Type, Name, {OldConf, Conf}) ->
|
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
|
%%
|
|
%% - if the connection related configs like `servers` is updated, we should restart/start
|
|
%% or stop bridges according to the change.
|
|
%% - if the connection related configs are not update, only non-connection configs like
|
|
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
|
|
%% without restarting the bridge.
|
|
%%
|
|
case if_only_to_toggle_enable(OldConf, Conf) of
|
|
false ->
|
|
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
|
config => Conf}),
|
|
case recreate(Type, Name, Conf) of
|
|
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
|
{error, not_found} ->
|
|
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
|
|
, type => Type, name => Name, config => Conf}),
|
|
create(Type, Name, Conf);
|
|
{error, Reason} -> {error, {update_bridge_failed, Reason}}
|
|
end;
|
|
true ->
|
|
%% we don't need to recreate the bridge if this config change is only to
|
|
%% toggole the config 'bridge.{type}.{name}.enable'
|
|
case maps:get(enable, Conf, true) of
|
|
true -> restart(Type, Name);
|
|
false -> stop(Type, Name)
|
|
end
|
|
end.
|
|
|
|
recreate(Type, Name) ->
|
|
recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
|
|
|
|
recreate(Type, Name, Conf) ->
|
|
emqx_resource:recreate_local(resource_id(Type, Name),
|
|
emqx_bridge:resource_type(Type),
|
|
parse_confs(Type, Name, Conf),
|
|
#{}).
|
|
|
|
create_dry_run(Type, Conf) ->
|
|
|
|
Conf0 = Conf#{<<"egress">> =>
|
|
#{ <<"remote_topic">> => <<"t">>
|
|
, <<"remote_qos">> => 0
|
|
, <<"retain">> => true
|
|
, <<"payload">> => <<"val">>
|
|
},
|
|
<<"ingress">> =>
|
|
#{ <<"remote_topic">> => <<"t">>
|
|
}},
|
|
case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
|
|
{ok, Conf1} ->
|
|
emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
|
|
{error, _} = Error ->
|
|
Error
|
|
end.
|
|
|
|
remove(BridgeId) ->
|
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
remove(BridgeType, BridgeName, #{}).
|
|
|
|
remove(Type, Name) ->
|
|
remove(Type, Name, undefined).
|
|
|
|
%% just for perform_bridge_changes/1
|
|
remove(Type, Name, _Conf) ->
|
|
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
|
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
|
ok -> ok;
|
|
{error, not_found} -> ok;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
diff_confs(NewConfs, OldConfs) ->
|
|
emqx_map_lib:diff_maps(flatten_confs(NewConfs),
|
|
flatten_confs(OldConfs)).
|
|
|
|
flatten_confs(Conf0) ->
|
|
maps:from_list(
|
|
lists:flatmap(fun({Type, Conf}) ->
|
|
do_flatten_confs(Type, Conf)
|
|
end, maps:to_list(Conf0))).
|
|
|
|
do_flatten_confs(Type, Conf0) ->
|
|
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
|
|
|
get_matched_bridges(Topic) ->
|
|
Bridges = emqx:get_config([bridges], #{}),
|
|
maps:fold(fun (BType, Conf, Acc0) ->
|
|
maps:fold(fun
|
|
%% Confs for MQTT, Kafka bridges have the `direction` flag
|
|
(_BName, #{direction := ingress}, Acc1) ->
|
|
Acc1;
|
|
(BName, #{direction := egress} = Egress, Acc1) ->
|
|
%% HTTP, MySQL bridges only have egress direction
|
|
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1)
|
|
end, Acc0, Conf)
|
|
end, [], Bridges).
|
|
|
|
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
|
|
Acc;
|
|
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
|
case emqx_topic:match(Topic, Filter) of
|
|
true -> [bridge_id(BType, BName) | Acc];
|
|
false -> Acc
|
|
end.
|
|
|
|
parse_confs(http, _Name,
|
|
#{ url := Url
|
|
, method := Method
|
|
, body := Body
|
|
, headers := Headers
|
|
, request_timeout := ReqTimeout
|
|
} = Conf) ->
|
|
{BaseUrl, Path} = parse_url(Url),
|
|
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
|
|
Conf#{ base_url => BaseUrl2
|
|
, request =>
|
|
#{ path => Path
|
|
, method => Method
|
|
, body => Body
|
|
, headers => Headers
|
|
, request_timeout => ReqTimeout
|
|
}
|
|
};
|
|
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
|
|
when is_binary(ConnId) ->
|
|
case emqx_connector:parse_connector_id(ConnId) of
|
|
{Type, ConnName} ->
|
|
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
|
make_resource_confs(Direction, ConnectorConfs,
|
|
maps:without([connector, direction], Conf), Type, Name);
|
|
{_ConnType, _ConnName} ->
|
|
error({cannot_use_connector_with_different_type, ConnId})
|
|
end;
|
|
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
|
|
when is_map(ConnectorConfs) ->
|
|
make_resource_confs(Direction, ConnectorConfs,
|
|
maps:without([connector, direction], Conf), Type, Name).
|
|
|
|
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
|
|
BName = bridge_id(Type, Name),
|
|
ConnectorConfs#{
|
|
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
|
};
|
|
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
|
|
ConnectorConfs#{
|
|
egress => BridgeConf
|
|
}.
|
|
|
|
parse_url(Url) ->
|
|
case string:split(Url, "//", leading) of
|
|
[Scheme, UrlRem] ->
|
|
case string:split(UrlRem, "/", leading) of
|
|
[HostPort, Path] ->
|
|
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
|
[HostPort] ->
|
|
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
|
end;
|
|
[Url] ->
|
|
error({invalid_url, Url})
|
|
end.
|
|
|
|
maybe_disable_bridge(Type, Name, Conf) ->
|
|
case maps:get(enable, Conf, true) of
|
|
false -> stop(Type, Name);
|
|
true -> ok
|
|
end.
|
|
|
|
if_only_to_toggle_enable(OldConf, Conf) ->
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
emqx_map_lib:diff_maps(OldConf, Conf),
|
|
case {Added, Removed, Updated} of
|
|
{Added, Removed, #{enable := _}= Updated}
|
|
when map_size(Added) =:= 0,
|
|
map_size(Removed) =:= 0,
|
|
map_size(Updated) =:= 1 -> true;
|
|
{_, _, _} -> false
|
|
end.
|
|
|
|
-spec get_basic_usage_info() ->
|
|
#{ num_bridges => non_neg_integer()
|
|
, count_by_type =>
|
|
#{ BridgeType => non_neg_integer()
|
|
}
|
|
} when BridgeType :: atom().
|
|
get_basic_usage_info() ->
|
|
InitialAcc = #{num_bridges => 0, count_by_type => #{}},
|
|
try
|
|
lists:foldl(
|
|
fun(#{resource_data := #{config := #{enable := false}}}, Acc) ->
|
|
Acc;
|
|
(#{type := BridgeType}, Acc) ->
|
|
NumBridges = maps:get(num_bridges, Acc),
|
|
CountByType0 = maps:get(count_by_type, Acc),
|
|
CountByType = maps:update_with(
|
|
binary_to_atom(BridgeType, utf8),
|
|
fun(X) -> X + 1 end,
|
|
1,
|
|
CountByType0),
|
|
Acc#{ num_bridges => NumBridges + 1
|
|
, count_by_type => CountByType
|
|
}
|
|
end,
|
|
InitialAcc,
|
|
list())
|
|
catch
|
|
%% for instance, when the bridge app is not ready yet.
|
|
_:_ ->
|
|
InitialAcc
|
|
end.
|
|
|
|
bin(Bin) when is_binary(Bin) -> Bin;
|
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|