emqx/apps/emqx_bridge/src/emqx_bridge.erl

738 lines
22 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
pre_config_update/3,
post_config_update/5
]).
-export([
load_hook/0,
unload_hook/0
]).
-export([on_message_publish/1]).
-export([
load/0,
unload/0,
lookup/1,
lookup/2,
get_metrics/2,
create/3,
disable_enable/3,
remove/2,
check_deps_and_remove/3,
list/0,
reload_hook/1
]).
-export([
send_message/2,
send_message/5
]).
-export([config_key_path/0]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%% Data backup
-export([
import_config/1,
%% exported for emqx_bridge_v2
import_config/4
]).
-export([query_opts/1]).
-define(EGRESS_DIR_BRIDGES(T),
T == webhook;
T == mysql;
T == gcp_pubsub;
T == influxdb_api_v1;
T == influxdb_api_v2;
T == kafka_producer;
T == redis_single;
T == redis_sentinel;
T == redis_cluster;
T == clickhouse;
T == pgsql;
T == timescale;
T == matrix;
T == tdengine;
T == dynamo;
T == rocketmq;
T == cassandra;
T == sqlserver;
T == pulsar_producer;
T == oracle;
T == iotdb;
T == kinesis_producer;
T == greptimedb;
T == azure_event_hub_producer;
T == syskeeper_forwarder
).
-define(ROOT_KEY, bridges).
%% See `hocon_tconf`
-define(MAP_KEY_RE, <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>).
load() ->
Bridges = emqx:get_config([?ROOT_KEY], #{}),
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
emqx_utils:pforeach(
fun({Name, Conf}) ->
%% fetch opts for `emqx_resource_buffer_worker`
ResOpts = emqx_resource:fetch_creation_opts(Conf),
safe_load_bridge(Type, Name, Conf, ResOpts)
end,
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Bridges),
infinity
).
unload() ->
unload_hook(),
Bridges = emqx:get_config([?ROOT_KEY], #{}),
emqx_utils:pforeach(
fun({Type, NamedConf}) ->
emqx_utils:pforeach(
fun({Name, _Conf}) ->
_ = emqx_bridge_resource:stop(Type, Name)
end,
maps:to_list(NamedConf),
infinity
)
end,
maps:to_list(Bridges),
infinity
).
safe_load_bridge(Type, Name, Conf, Opts) ->
try
_Res = emqx_bridge_resource:create(Type, Name, Conf, Opts),
?tp(
emqx_bridge_loaded,
#{
type => Type,
name => Name,
res => _Res
}
)
catch
Err:Reason:ST ->
?SLOG(error, #{
msg => "load_bridge_failed",
type => Type,
name => Name,
error => Err,
reason => Reason,
stacktrace => ST
})
end.
reload_hook(Bridges) ->
ok = unload_hook(),
ok = load_hook(Bridges).
load_hook() ->
Bridges = emqx:get_config([?ROOT_KEY], #{}),
load_hook(Bridges).
load_hook(Bridges) ->
lists:foreach(
fun({Type, Bridge}) ->
lists:foreach(
fun({_Name, BridgeConf}) ->
do_load_hook(Type, BridgeConf)
end,
maps:to_list(Bridge)
)
end,
maps:to_list(Bridges)
).
do_load_hook(Type, #{local_topic := LocalTopic}) when
?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic)
->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(_Type, _Conf) ->
ok.
unload_hook() ->
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of
false ->
send_to_matched_egress_bridges(Topic, Message);
true ->
ok
end,
{ok, Message}.
send_to_matched_egress_bridges(Topic, Message) ->
case get_matched_egress_bridges(Topic) of
[] ->
ok;
Ids ->
{Msg, _} = emqx_rule_events:eventmsg_publish(Message),
send_to_matched_egress_bridges_loop(Topic, Msg, Ids)
end.
send_to_matched_egress_bridges_loop(_Topic, _Msg, []) ->
ok;
send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) ->
try send_message(Id, Msg) of
{error, Reason} ->
?SLOG(error, #{
msg => "send_message_to_bridge_failed",
bridge => Id,
error => Reason
});
_ ->
ok
catch
throw:Reason ->
?SLOG(error, #{
msg => "send_message_to_bridge_exception",
bridge => Id,
reason => emqx_utils:redact(Reason)
});
Err:Reason:ST ->
?SLOG(error, #{
msg => "send_message_to_bridge_exception",
bridge => Id,
error => Err,
reason => emqx_utils:redact(Reason),
stacktrace => emqx_utils:redact(ST)
})
end,
send_to_matched_egress_bridges_loop(Topic, Msg, Ids).
send_message(BridgeId, Message) ->
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
true ->
ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
false ->
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
end.
send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
not_found ->
{error, bridge_not_found};
#{enable := true} = Config ->
QueryOpts = maps:merge(query_opts(Config), QueryOpts0),
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
#{enable := false} ->
{error, bridge_stopped}
end.
query_opts(Config) ->
case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
%% request_ttl is configured
#{timeout => Timeout};
_ ->
%% emqx_resource has a default value (15s)
#{}
end.
config_key_path() ->
[?ROOT_KEY].
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
case multi_validate_bridge_names(NewConf) of
ok ->
{ok, convert_certs(NewConf)};
Error ->
Error
end.
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
Result = perform_bridge_changes([
#{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed},
#{
action => fun emqx_bridge_resource:create/4,
action_name => create,
data => Added,
on_exception_fn => fun emqx_bridge_resource:remove/4
},
#{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated}
]),
ok = unload_hook(),
ok = load_hook(NewConf),
?tp(bridge_post_config_update_done, #{}),
Result.
list() ->
BridgeV1Bridges =
maps:fold(
fun(Type, NameAndConf, Bridges) ->
maps:fold(
fun(Name, RawConf, Acc) ->
case lookup(Type, Name, RawConf) of
{error, not_found} -> Acc;
{ok, Res} -> [Res | Acc]
end
end,
Bridges,
NameAndConf
)
end,
[],
emqx:get_raw_config([bridges], #{})
),
BridgeV2Bridges =
emqx_bridge_v2:bridge_v1_list_and_transform(),
BridgeV1Bridges ++ BridgeV2Bridges.
lookup(Id) ->
{Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
lookup(Type, Name).
lookup(Type, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name);
false ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf)
end.
lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of
{error, not_found} ->
{error, not_found};
{ok, _, Data} ->
{ok, #{
type => Type,
name => Name,
resource_data => Data,
raw_config => maybe_upgrade(Type, RawConf)
}}
end.
get_metrics(ActionType, Name) ->
case emqx_bridge_v2:is_bridge_v2_type(ActionType) of
true ->
case emqx_bridge_v2:bridge_v1_is_valid(ActionType, Name) of
true ->
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(ActionType),
try
ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one(
BridgeV2Type, Name
),
emqx_bridge_v2:get_metrics(ConfRootKey, BridgeV2Type, Name)
catch
error:Reason ->
{error, Reason}
end;
false ->
{error, not_bridge_v1_compatible}
end;
false ->
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(ActionType, Name))
end.
maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) ->
emqx_bridge_compatible_config:http_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) ->
Config.
disable_enable(Action, BridgeType0, BridgeName) when
Action =:= disable; Action =:= enable
->
BridgeType = upgrade_type(BridgeType0),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_enable_disable(Action, BridgeType, BridgeName);
false ->
emqx_conf:update(
config_key_path() ++ [BridgeType, BridgeName],
{Action, BridgeType, BridgeName},
#{override_to => cluster}
)
end.
create(BridgeV1Type, BridgeName, RawConf) ->
BridgeType = upgrade_type(BridgeV1Type),
?SLOG(debug, #{
bridge_action => create,
bridge_type => BridgeType,
bridge_name => BridgeName,
bridge_raw_config => emqx_utils:redact(RawConf)
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf);
false ->
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
RawConf,
#{override_to => cluster}
)
end.
%% NOTE: This function can cause broken references but it is only called from
%% test cases.
-spec remove(atom() | binary(), binary()) -> ok | {error, any()}.
remove(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0),
?SLOG(debug, #{
bridge_action => remove,
bridge_type => BridgeType,
bridge_name => BridgeName
}),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
false ->
remove_v1(BridgeType, BridgeName)
end.
remove_v1(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0),
case
emqx_conf:remove(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
#{override_to => cluster}
)
of
{ok, _} ->
ok;
{error, Reason} ->
{error, Reason}
end.
check_deps_and_remove(BridgeType0, BridgeName, RemoveDeps) ->
BridgeType = upgrade_type(BridgeType0),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
emqx_bridge_v2:bridge_v1_check_deps_and_remove(
BridgeType,
BridgeName,
RemoveDeps
);
false ->
do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps)
end.
do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
case emqx_bridge_lib:maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) of
ok ->
remove(BridgeType, BridgeName);
{error, Reason} ->
{error, Reason}
end.
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(RawConf) ->
import_config(RawConf, <<"bridges">>, ?ROOT_KEY, config_key_path()).
%% Used in emqx_bridge_v2
import_config(RawConf, RawConfKey, RootKey, RootKeyPath) ->
BridgesConf = maps:get(RawConfKey, RawConf, #{}),
OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}),
MergedConf = merge_confs(OldBridgesConf, BridgesConf),
case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => RootKey, changed => changed_paths(OldBridgesConf, NewRawConf)}};
Error ->
{error, #{root_key => RootKey, reason => Error}}
end.
merge_confs(OldConf, NewConf) ->
AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
lists:foldr(
fun(Type, Acc) ->
NewBridges = maps:get(Type, NewConf, #{}),
OldBridges = maps:get(Type, OldConf, #{}),
Acc#{Type => maps:merge(OldBridges, NewBridges)}
end,
#{},
AllTypes
).
changed_paths(OldRawConf, NewRawConf) ->
maps:fold(
fun(Type, Bridges, ChangedAcc) ->
OldBridges = maps:get(Type, OldRawConf, #{}),
Changed = maps:get(changed, emqx_utils_maps:diff_maps(Bridges, OldBridges)),
[[?ROOT_KEY, Type, K] || K <- maps:keys(Changed)] ++ ChangedAcc
end,
[],
NewRawConf
).
%%========================================================================================
%% Helper functions
%%========================================================================================
convert_certs(BridgesConf) ->
maps:map(
fun(Type, Bridges) ->
maps:map(
fun(Name, BridgeConf) ->
Path = filename:join([?ROOT_KEY, Type, Name]),
case emqx_connector_ssl:convert_certs(Path, BridgeConf) of
{error, Reason} ->
?SLOG(error, #{
msg => "bad_ssl_config",
type => Type,
name => Name,
reason => Reason
}),
throw({bad_ssl_config, Reason});
{ok, BridgeConf1} ->
BridgeConf1
end
end,
Bridges
)
end,
BridgesConf
).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, []).
perform_bridge_changes([], Errors) ->
case Errors of
[] -> ok;
_ -> {error, Errors}
end;
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
Results = emqx_utils:pmap(
fun({{Type, Name}, Conf}) ->
ResOpts = creation_opts(Conf),
Res =
try
Action(Type, Name, Conf, ResOpts)
catch
Kind:Error:Stacktrace ->
?SLOG(error, #{
msg => "bridge_config_update_exception",
kind => Kind,
error => Error,
type => Type,
name => Name,
stacktrace => Stacktrace
}),
OnException(Type, Name, Conf, ResOpts),
{error, Error}
end,
{{Type, Name}, Res}
end,
maps:to_list(MapConfs),
infinity
),
Errs = lists:filter(
fun
({_TypeName, {error, _}}) -> true;
(_) -> false
end,
Results
),
Errors =
case Errs of
[] ->
Errors0;
_ ->
#{action_name := ActionName} = Task,
[#{action => ActionName, errors => Errs} | Errors0]
end,
perform_bridge_changes(Tasks, Errors).
creation_opts({_OldConf, Conf}) ->
emqx_resource:fetch_creation_opts(Conf);
creation_opts(Conf) ->
emqx_resource:fetch_creation_opts(Conf).
diff_confs(NewConfs, OldConfs) ->
emqx_utils_maps:diff_maps(
flatten_confs(NewConfs),
flatten_confs(OldConfs)
).
flatten_confs(Conf0) ->
maps:from_list(
lists:flatmap(
fun({Type, Conf}) ->
do_flatten_confs(Type, Conf)
end,
maps:to_list(Conf0)
)
).
do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
%% TODO: create a topic index for this
get_matched_egress_bridges(Topic) ->
Bridges = emqx:get_config([bridges], #{}),
maps:fold(
fun(BType, Conf, Acc0) ->
maps:fold(
fun
(BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt ->
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1);
(_BName, #{ingress := _}, Acc1) when BType =:= mqtt ->
%% ignore ingress only bridge
Acc1;
(BName, BConf, Acc1) ->
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
end,
Acc0,
Conf
)
end,
[],
Bridges
).
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
Acc;
get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) ->
case maps:get(local_topic, Conf, undefined) of
undefined ->
Acc;
Filter ->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
end;
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
get_matched_bridge_id(_BType, _Conf, _Topic, _BName, Acc) ->
Acc.
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc];
false -> Acc
end.
-spec get_basic_usage_info() ->
#{
num_bridges => non_neg_integer(),
count_by_type =>
#{BridgeType => non_neg_integer()}
}
when
BridgeType :: atom().
get_basic_usage_info() ->
InitialAcc = #{num_bridges => 0, count_by_type => #{}},
try
lists:foldl(
fun
(#{resource_data := #{config := #{enable := false}}}, Acc) ->
Acc;
(#{type := BridgeType}, Acc) ->
NumBridges = maps:get(num_bridges, Acc),
CountByType0 = maps:get(count_by_type, Acc),
CountByType = maps:update_with(
binary_to_atom(BridgeType, utf8),
fun(X) -> X + 1 end,
1,
CountByType0
),
Acc#{
num_bridges => NumBridges + 1,
count_by_type => CountByType
}
end,
InitialAcc,
list()
)
catch
%% for instance, when the bridge app is not ready yet.
_:_ ->
InitialAcc
end.
validate_bridge_name(BridgeName) ->
try
_ = emqx_resource:validate_name(to_bin(BridgeName)),
ok
catch
throw:Error ->
{error, Error}
end.
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
to_bin(B) when is_binary(B) -> B.
upgrade_type(Type) ->
emqx_bridge_lib:upgrade_type(Type).
multi_validate_bridge_names(Conf) ->
BridgeTypeAndNames =
[
{Type, Name}
|| {Type, NameToConf} <- maps:to_list(Conf),
{Name, _Conf} <- maps:to_list(NameToConf)
],
BadBridges =
lists:filtermap(
fun({Type, Name}) ->
case validate_bridge_name(Name) of
ok -> false;
_Error -> {true, #{type => Type, name => Name}}
end
end,
BridgeTypeAndNames
),
case BadBridges of
[] ->
ok;
[_ | _] ->
{error, #{
kind => validation_error,
reason => bad_bridge_names,
bad_bridges => BadBridges
}}
end.