738 lines
22 KiB
Erlang
738 lines
22 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_bridge).
|
|
|
|
-behaviour(emqx_config_handler).
|
|
-behaviour(emqx_config_backup).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-export([
|
|
pre_config_update/3,
|
|
post_config_update/5
|
|
]).
|
|
|
|
-export([
|
|
load_hook/0,
|
|
unload_hook/0
|
|
]).
|
|
|
|
-export([on_message_publish/1]).
|
|
|
|
-export([
|
|
load/0,
|
|
unload/0,
|
|
lookup/1,
|
|
lookup/2,
|
|
get_metrics/2,
|
|
create/3,
|
|
disable_enable/3,
|
|
remove/2,
|
|
check_deps_and_remove/3,
|
|
list/0,
|
|
reload_hook/1
|
|
]).
|
|
|
|
-export([
|
|
send_message/2,
|
|
send_message/5
|
|
]).
|
|
|
|
-export([config_key_path/0]).
|
|
|
|
%% exported for `emqx_telemetry'
|
|
-export([get_basic_usage_info/0]).
|
|
|
|
%% Data backup
|
|
-export([
|
|
import_config/1,
|
|
%% exported for emqx_bridge_v2
|
|
import_config/4
|
|
]).
|
|
|
|
-export([query_opts/1]).
|
|
|
|
-define(EGRESS_DIR_BRIDGES(T),
|
|
T == webhook;
|
|
T == mysql;
|
|
T == gcp_pubsub;
|
|
T == influxdb_api_v1;
|
|
T == influxdb_api_v2;
|
|
T == kafka_producer;
|
|
T == redis_single;
|
|
T == redis_sentinel;
|
|
T == redis_cluster;
|
|
T == clickhouse;
|
|
T == pgsql;
|
|
T == timescale;
|
|
T == matrix;
|
|
T == tdengine;
|
|
T == dynamo;
|
|
T == rocketmq;
|
|
T == cassandra;
|
|
T == sqlserver;
|
|
T == pulsar_producer;
|
|
T == oracle;
|
|
T == iotdb;
|
|
T == kinesis_producer;
|
|
T == greptimedb;
|
|
T == azure_event_hub_producer;
|
|
T == syskeeper_forwarder
|
|
).
|
|
|
|
-define(ROOT_KEY, bridges).
|
|
|
|
%% See `hocon_tconf`
|
|
-define(MAP_KEY_RE, <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>).
|
|
|
|
load() ->
|
|
Bridges = emqx:get_config([?ROOT_KEY], #{}),
|
|
emqx_utils:pforeach(
|
|
fun({Type, NamedConf}) ->
|
|
emqx_utils:pforeach(
|
|
fun({Name, Conf}) ->
|
|
%% fetch opts for `emqx_resource_buffer_worker`
|
|
ResOpts = emqx_resource:fetch_creation_opts(Conf),
|
|
safe_load_bridge(Type, Name, Conf, ResOpts)
|
|
end,
|
|
maps:to_list(NamedConf),
|
|
infinity
|
|
)
|
|
end,
|
|
maps:to_list(Bridges),
|
|
infinity
|
|
).
|
|
|
|
unload() ->
|
|
unload_hook(),
|
|
Bridges = emqx:get_config([?ROOT_KEY], #{}),
|
|
emqx_utils:pforeach(
|
|
fun({Type, NamedConf}) ->
|
|
emqx_utils:pforeach(
|
|
fun({Name, _Conf}) ->
|
|
_ = emqx_bridge_resource:stop(Type, Name)
|
|
end,
|
|
maps:to_list(NamedConf),
|
|
infinity
|
|
)
|
|
end,
|
|
maps:to_list(Bridges),
|
|
infinity
|
|
).
|
|
|
|
safe_load_bridge(Type, Name, Conf, Opts) ->
|
|
try
|
|
_Res = emqx_bridge_resource:create(Type, Name, Conf, Opts),
|
|
?tp(
|
|
emqx_bridge_loaded,
|
|
#{
|
|
type => Type,
|
|
name => Name,
|
|
res => _Res
|
|
}
|
|
)
|
|
catch
|
|
Err:Reason:ST ->
|
|
?SLOG(error, #{
|
|
msg => "load_bridge_failed",
|
|
type => Type,
|
|
name => Name,
|
|
error => Err,
|
|
reason => Reason,
|
|
stacktrace => ST
|
|
})
|
|
end.
|
|
|
|
reload_hook(Bridges) ->
|
|
ok = unload_hook(),
|
|
ok = load_hook(Bridges).
|
|
|
|
load_hook() ->
|
|
Bridges = emqx:get_config([?ROOT_KEY], #{}),
|
|
load_hook(Bridges).
|
|
|
|
load_hook(Bridges) ->
|
|
lists:foreach(
|
|
fun({Type, Bridge}) ->
|
|
lists:foreach(
|
|
fun({_Name, BridgeConf}) ->
|
|
do_load_hook(Type, BridgeConf)
|
|
end,
|
|
maps:to_list(Bridge)
|
|
)
|
|
end,
|
|
maps:to_list(Bridges)
|
|
).
|
|
|
|
do_load_hook(Type, #{local_topic := LocalTopic}) when
|
|
?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic)
|
|
->
|
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
|
|
do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
|
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
|
|
do_load_hook(_Type, _Conf) ->
|
|
ok.
|
|
|
|
unload_hook() ->
|
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
|
|
|
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
|
case maps:get(sys, Flags, false) of
|
|
false ->
|
|
send_to_matched_egress_bridges(Topic, Message);
|
|
true ->
|
|
ok
|
|
end,
|
|
{ok, Message}.
|
|
|
|
send_to_matched_egress_bridges(Topic, Message) ->
|
|
case get_matched_egress_bridges(Topic) of
|
|
[] ->
|
|
ok;
|
|
Ids ->
|
|
{Msg, _} = emqx_rule_events:eventmsg_publish(Message),
|
|
send_to_matched_egress_bridges_loop(Topic, Msg, Ids)
|
|
end.
|
|
|
|
send_to_matched_egress_bridges_loop(_Topic, _Msg, []) ->
|
|
ok;
|
|
send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) ->
|
|
try send_message(Id, Msg) of
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "send_message_to_bridge_failed",
|
|
bridge => Id,
|
|
error => Reason
|
|
});
|
|
_ ->
|
|
ok
|
|
catch
|
|
throw:Reason ->
|
|
?SLOG(error, #{
|
|
msg => "send_message_to_bridge_exception",
|
|
bridge => Id,
|
|
reason => emqx_utils:redact(Reason)
|
|
});
|
|
Err:Reason:ST ->
|
|
?SLOG(error, #{
|
|
msg => "send_message_to_bridge_exception",
|
|
bridge => Id,
|
|
error => Err,
|
|
reason => emqx_utils:redact(Reason),
|
|
stacktrace => emqx_utils:redact(ST)
|
|
})
|
|
end,
|
|
send_to_matched_egress_bridges_loop(Topic, Msg, Ids).
|
|
|
|
send_message(BridgeId, Message) ->
|
|
{BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
|
|
true ->
|
|
ActionType = emqx_action_info:bridge_v1_type_to_action_type(BridgeV1Type),
|
|
emqx_bridge_v2:send_message(ActionType, BridgeName, Message, #{});
|
|
false ->
|
|
ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
|
|
send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
|
|
end.
|
|
|
|
send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
|
|
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
|
|
not_found ->
|
|
{error, bridge_not_found};
|
|
#{enable := true} = Config ->
|
|
QueryOpts = maps:merge(query_opts(Config), QueryOpts0),
|
|
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
|
|
#{enable := false} ->
|
|
{error, bridge_stopped}
|
|
end.
|
|
|
|
query_opts(Config) ->
|
|
case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
|
|
Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
|
|
%% request_ttl is configured
|
|
#{timeout => Timeout};
|
|
_ ->
|
|
%% emqx_resource has a default value (15s)
|
|
#{}
|
|
end.
|
|
|
|
config_key_path() ->
|
|
[?ROOT_KEY].
|
|
|
|
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
|
|
{ok, RawConf};
|
|
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
|
|
case multi_validate_bridge_names(NewConf) of
|
|
ok ->
|
|
{ok, convert_certs(NewConf)};
|
|
Error ->
|
|
Error
|
|
end.
|
|
|
|
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
diff_confs(NewConf, OldConf),
|
|
Result = perform_bridge_changes([
|
|
#{action => fun emqx_bridge_resource:remove/4, action_name => remove, data => Removed},
|
|
#{
|
|
action => fun emqx_bridge_resource:create/4,
|
|
action_name => create,
|
|
data => Added,
|
|
on_exception_fn => fun emqx_bridge_resource:remove/4
|
|
},
|
|
#{action => fun emqx_bridge_resource:update/4, action_name => update, data => Updated}
|
|
]),
|
|
ok = unload_hook(),
|
|
ok = load_hook(NewConf),
|
|
?tp(bridge_post_config_update_done, #{}),
|
|
Result.
|
|
|
|
list() ->
|
|
BridgeV1Bridges =
|
|
maps:fold(
|
|
fun(Type, NameAndConf, Bridges) ->
|
|
maps:fold(
|
|
fun(Name, RawConf, Acc) ->
|
|
case lookup(Type, Name, RawConf) of
|
|
{error, not_found} -> Acc;
|
|
{ok, Res} -> [Res | Acc]
|
|
end
|
|
end,
|
|
Bridges,
|
|
NameAndConf
|
|
)
|
|
end,
|
|
[],
|
|
emqx:get_raw_config([bridges], #{})
|
|
),
|
|
BridgeV2Bridges =
|
|
emqx_bridge_v2:bridge_v1_list_and_transform(),
|
|
BridgeV1Bridges ++ BridgeV2Bridges.
|
|
|
|
lookup(Id) ->
|
|
{Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
|
|
lookup(Type, Name).
|
|
|
|
lookup(Type, Name) ->
|
|
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
|
true ->
|
|
emqx_bridge_v2:bridge_v1_lookup_and_transform(Type, Name);
|
|
false ->
|
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
|
lookup(Type, Name, RawConf)
|
|
end.
|
|
|
|
lookup(Type, Name, RawConf) ->
|
|
case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of
|
|
{error, not_found} ->
|
|
{error, not_found};
|
|
{ok, _, Data} ->
|
|
{ok, #{
|
|
type => Type,
|
|
name => Name,
|
|
resource_data => Data,
|
|
raw_config => maybe_upgrade(Type, RawConf)
|
|
}}
|
|
end.
|
|
|
|
get_metrics(ActionType, Name) ->
|
|
case emqx_bridge_v2:is_bridge_v2_type(ActionType) of
|
|
true ->
|
|
case emqx_bridge_v2:bridge_v1_is_valid(ActionType, Name) of
|
|
true ->
|
|
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(ActionType),
|
|
try
|
|
ConfRootKey = emqx_bridge_v2:get_conf_root_key_if_only_one(
|
|
BridgeV2Type, Name
|
|
),
|
|
emqx_bridge_v2:get_metrics(ConfRootKey, BridgeV2Type, Name)
|
|
catch
|
|
error:Reason ->
|
|
{error, Reason}
|
|
end;
|
|
false ->
|
|
{error, not_bridge_v1_compatible}
|
|
end;
|
|
false ->
|
|
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(ActionType, Name))
|
|
end.
|
|
|
|
maybe_upgrade(mqtt, Config) ->
|
|
emqx_bridge_compatible_config:maybe_upgrade(Config);
|
|
maybe_upgrade(webhook, Config) ->
|
|
emqx_bridge_compatible_config:http_maybe_upgrade(Config);
|
|
maybe_upgrade(_Other, Config) ->
|
|
Config.
|
|
|
|
disable_enable(Action, BridgeType0, BridgeName) when
|
|
Action =:= disable; Action =:= enable
|
|
->
|
|
BridgeType = upgrade_type(BridgeType0),
|
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
|
true ->
|
|
emqx_bridge_v2:bridge_v1_enable_disable(Action, BridgeType, BridgeName);
|
|
false ->
|
|
emqx_conf:update(
|
|
config_key_path() ++ [BridgeType, BridgeName],
|
|
{Action, BridgeType, BridgeName},
|
|
#{override_to => cluster}
|
|
)
|
|
end.
|
|
|
|
create(BridgeV1Type, BridgeName, RawConf) ->
|
|
BridgeType = upgrade_type(BridgeV1Type),
|
|
?SLOG(debug, #{
|
|
bridge_action => create,
|
|
bridge_type => BridgeType,
|
|
bridge_name => BridgeName,
|
|
bridge_raw_config => emqx_utils:redact(RawConf)
|
|
}),
|
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
|
true ->
|
|
emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf);
|
|
false ->
|
|
emqx_conf:update(
|
|
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
|
RawConf,
|
|
#{override_to => cluster}
|
|
)
|
|
end.
|
|
|
|
%% NOTE: This function can cause broken references but it is only called from
|
|
%% test cases.
|
|
-spec remove(atom() | binary(), binary()) -> ok | {error, any()}.
|
|
remove(BridgeType0, BridgeName) ->
|
|
BridgeType = upgrade_type(BridgeType0),
|
|
?SLOG(debug, #{
|
|
bridge_action => remove,
|
|
bridge_type => BridgeType,
|
|
bridge_name => BridgeName
|
|
}),
|
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
|
true ->
|
|
emqx_bridge_v2:bridge_v1_remove(BridgeType0, BridgeName);
|
|
false ->
|
|
remove_v1(BridgeType, BridgeName)
|
|
end.
|
|
|
|
remove_v1(BridgeType0, BridgeName) ->
|
|
BridgeType = upgrade_type(BridgeType0),
|
|
case
|
|
emqx_conf:remove(
|
|
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
|
#{override_to => cluster}
|
|
)
|
|
of
|
|
{ok, _} ->
|
|
ok;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
check_deps_and_remove(BridgeType0, BridgeName, RemoveDeps) ->
|
|
BridgeType = upgrade_type(BridgeType0),
|
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
|
true ->
|
|
emqx_bridge_v2:bridge_v1_check_deps_and_remove(
|
|
BridgeType,
|
|
BridgeName,
|
|
RemoveDeps
|
|
);
|
|
false ->
|
|
do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps)
|
|
end.
|
|
|
|
do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
|
|
case emqx_bridge_lib:maybe_withdraw_rule_action(BridgeType, BridgeName, RemoveDeps) of
|
|
ok ->
|
|
remove(BridgeType, BridgeName);
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
%%----------------------------------------------------------------------------------------
|
|
%% Data backup
|
|
%%----------------------------------------------------------------------------------------
|
|
|
|
import_config(RawConf) ->
|
|
import_config(RawConf, <<"bridges">>, ?ROOT_KEY, config_key_path()).
|
|
|
|
%% Used in emqx_bridge_v2
|
|
import_config(RawConf, RawConfKey, RootKey, RootKeyPath) ->
|
|
BridgesConf = maps:get(RawConfKey, RawConf, #{}),
|
|
OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}),
|
|
MergedConf = merge_confs(OldBridgesConf, BridgesConf),
|
|
case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
|
|
{ok, #{raw_config := NewRawConf}} ->
|
|
{ok, #{root_key => RootKey, changed => changed_paths(OldBridgesConf, NewRawConf)}};
|
|
Error ->
|
|
{error, #{root_key => RootKey, reason => Error}}
|
|
end.
|
|
|
|
merge_confs(OldConf, NewConf) ->
|
|
AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
|
|
lists:foldr(
|
|
fun(Type, Acc) ->
|
|
NewBridges = maps:get(Type, NewConf, #{}),
|
|
OldBridges = maps:get(Type, OldConf, #{}),
|
|
Acc#{Type => maps:merge(OldBridges, NewBridges)}
|
|
end,
|
|
#{},
|
|
AllTypes
|
|
).
|
|
|
|
changed_paths(OldRawConf, NewRawConf) ->
|
|
maps:fold(
|
|
fun(Type, Bridges, ChangedAcc) ->
|
|
OldBridges = maps:get(Type, OldRawConf, #{}),
|
|
Changed = maps:get(changed, emqx_utils_maps:diff_maps(Bridges, OldBridges)),
|
|
[[?ROOT_KEY, Type, K] || K <- maps:keys(Changed)] ++ ChangedAcc
|
|
end,
|
|
[],
|
|
NewRawConf
|
|
).
|
|
|
|
%%========================================================================================
|
|
%% Helper functions
|
|
%%========================================================================================
|
|
|
|
convert_certs(BridgesConf) ->
|
|
maps:map(
|
|
fun(Type, Bridges) ->
|
|
maps:map(
|
|
fun(Name, BridgeConf) ->
|
|
Path = filename:join([?ROOT_KEY, Type, Name]),
|
|
case emqx_connector_ssl:convert_certs(Path, BridgeConf) of
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "bad_ssl_config",
|
|
type => Type,
|
|
name => Name,
|
|
reason => Reason
|
|
}),
|
|
throw({bad_ssl_config, Reason});
|
|
{ok, BridgeConf1} ->
|
|
BridgeConf1
|
|
end
|
|
end,
|
|
Bridges
|
|
)
|
|
end,
|
|
BridgesConf
|
|
).
|
|
|
|
perform_bridge_changes(Tasks) ->
|
|
perform_bridge_changes(Tasks, []).
|
|
|
|
perform_bridge_changes([], Errors) ->
|
|
case Errors of
|
|
[] -> ok;
|
|
_ -> {error, Errors}
|
|
end;
|
|
perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Errors0) ->
|
|
OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end),
|
|
Results = emqx_utils:pmap(
|
|
fun({{Type, Name}, Conf}) ->
|
|
ResOpts = creation_opts(Conf),
|
|
Res =
|
|
try
|
|
Action(Type, Name, Conf, ResOpts)
|
|
catch
|
|
Kind:Error:Stacktrace ->
|
|
?SLOG(error, #{
|
|
msg => "bridge_config_update_exception",
|
|
kind => Kind,
|
|
error => Error,
|
|
type => Type,
|
|
name => Name,
|
|
stacktrace => Stacktrace
|
|
}),
|
|
OnException(Type, Name, Conf, ResOpts),
|
|
{error, Error}
|
|
end,
|
|
{{Type, Name}, Res}
|
|
end,
|
|
maps:to_list(MapConfs),
|
|
infinity
|
|
),
|
|
Errs = lists:filter(
|
|
fun
|
|
({_TypeName, {error, _}}) -> true;
|
|
(_) -> false
|
|
end,
|
|
Results
|
|
),
|
|
Errors =
|
|
case Errs of
|
|
[] ->
|
|
Errors0;
|
|
_ ->
|
|
#{action_name := ActionName} = Task,
|
|
[#{action => ActionName, errors => Errs} | Errors0]
|
|
end,
|
|
perform_bridge_changes(Tasks, Errors).
|
|
|
|
creation_opts({_OldConf, Conf}) ->
|
|
emqx_resource:fetch_creation_opts(Conf);
|
|
creation_opts(Conf) ->
|
|
emqx_resource:fetch_creation_opts(Conf).
|
|
|
|
diff_confs(NewConfs, OldConfs) ->
|
|
emqx_utils_maps:diff_maps(
|
|
flatten_confs(NewConfs),
|
|
flatten_confs(OldConfs)
|
|
).
|
|
|
|
flatten_confs(Conf0) ->
|
|
maps:from_list(
|
|
lists:flatmap(
|
|
fun({Type, Conf}) ->
|
|
do_flatten_confs(Type, Conf)
|
|
end,
|
|
maps:to_list(Conf0)
|
|
)
|
|
).
|
|
|
|
do_flatten_confs(Type, Conf0) ->
|
|
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
|
|
|
%% TODO: create a topic index for this
|
|
get_matched_egress_bridges(Topic) ->
|
|
Bridges = emqx:get_config([bridges], #{}),
|
|
maps:fold(
|
|
fun(BType, Conf, Acc0) ->
|
|
maps:fold(
|
|
fun
|
|
(BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt ->
|
|
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1);
|
|
(_BName, #{ingress := _}, Acc1) when BType =:= mqtt ->
|
|
%% ignore ingress only bridge
|
|
Acc1;
|
|
(BName, BConf, Acc1) ->
|
|
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
|
|
end,
|
|
Acc0,
|
|
Conf
|
|
)
|
|
end,
|
|
[],
|
|
Bridges
|
|
).
|
|
|
|
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
|
|
Acc;
|
|
get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) ->
|
|
case maps:get(local_topic, Conf, undefined) of
|
|
undefined ->
|
|
Acc;
|
|
Filter ->
|
|
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
|
|
end;
|
|
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
|
|
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
|
|
get_matched_bridge_id(_BType, _Conf, _Topic, _BName, Acc) ->
|
|
Acc.
|
|
|
|
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
|
|
case emqx_topic:match(Topic, Filter) of
|
|
true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc];
|
|
false -> Acc
|
|
end.
|
|
|
|
-spec get_basic_usage_info() ->
|
|
#{
|
|
num_bridges => non_neg_integer(),
|
|
count_by_type =>
|
|
#{BridgeType => non_neg_integer()}
|
|
}
|
|
when
|
|
BridgeType :: atom().
|
|
get_basic_usage_info() ->
|
|
InitialAcc = #{num_bridges => 0, count_by_type => #{}},
|
|
try
|
|
lists:foldl(
|
|
fun
|
|
(#{resource_data := #{config := #{enable := false}}}, Acc) ->
|
|
Acc;
|
|
(#{type := BridgeType}, Acc) ->
|
|
NumBridges = maps:get(num_bridges, Acc),
|
|
CountByType0 = maps:get(count_by_type, Acc),
|
|
CountByType = maps:update_with(
|
|
binary_to_atom(BridgeType, utf8),
|
|
fun(X) -> X + 1 end,
|
|
1,
|
|
CountByType0
|
|
),
|
|
Acc#{
|
|
num_bridges => NumBridges + 1,
|
|
count_by_type => CountByType
|
|
}
|
|
end,
|
|
InitialAcc,
|
|
list()
|
|
)
|
|
catch
|
|
%% for instance, when the bridge app is not ready yet.
|
|
_:_ ->
|
|
InitialAcc
|
|
end.
|
|
|
|
validate_bridge_name(BridgeName) ->
|
|
try
|
|
_ = emqx_resource:validate_name(to_bin(BridgeName)),
|
|
ok
|
|
catch
|
|
throw:Error ->
|
|
{error, Error}
|
|
end.
|
|
|
|
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
|
to_bin(B) when is_binary(B) -> B.
|
|
|
|
upgrade_type(Type) ->
|
|
emqx_bridge_lib:upgrade_type(Type).
|
|
|
|
multi_validate_bridge_names(Conf) ->
|
|
BridgeTypeAndNames =
|
|
[
|
|
{Type, Name}
|
|
|| {Type, NameToConf} <- maps:to_list(Conf),
|
|
{Name, _Conf} <- maps:to_list(NameToConf)
|
|
],
|
|
BadBridges =
|
|
lists:filtermap(
|
|
fun({Type, Name}) ->
|
|
case validate_bridge_name(Name) of
|
|
ok -> false;
|
|
_Error -> {true, #{type => Type, name => Name}}
|
|
end
|
|
end,
|
|
BridgeTypeAndNames
|
|
),
|
|
case BadBridges of
|
|
[] ->
|
|
ok;
|
|
[_ | _] ->
|
|
{error, #{
|
|
kind => validation_error,
|
|
reason => bad_bridge_names,
|
|
bad_bridges => BadBridges
|
|
}}
|
|
end.
|