style: cleanup emqx_connector

This commit is contained in:
Stefan Strigler 2023-09-28 15:44:04 +02:00 committed by Zaiming (Stone) Shi
parent f7984be946
commit 8f109da62b
5 changed files with 83 additions and 245 deletions

View File

@ -19,7 +19,7 @@
-include("logger.hrl").
-include("emqx_schema.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("hocon/include/hocon_types.hrl").
-behaviour(gen_server).
@ -736,7 +736,7 @@ remove_empty_leaf(KeyPath, Handlers) ->
end.
assert_callback_function(Mod) ->
_ = Mod:module_info(),
_ = apply(Mod, module_info, []),
case
erlang:function_exported(Mod, pre_config_update, 3) orelse
erlang:function_exported(Mod, post_config_update, 5)

View File

@ -18,9 +18,7 @@
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
@ -29,25 +27,16 @@
]).
-export([
% load_hook/0,
% unload_hook/0
]).
% -export([on_message_publish/1]).
-export([
load/0,
unload/0,
lookup/1,
lookup/2,
get_metrics/2,
check_deps_and_remove/3,
create/3,
disable_enable/3,
get_metrics/2,
list/0,
load/0,
lookup/1,
lookup/2,
remove/2,
check_deps_and_remove/3,
list/0
% ,
% reload_hook/1
unload/0
]).
-export([config_key_path/0]).
@ -77,7 +66,6 @@ load() ->
).
unload() ->
%% unload_hook(),
Connectors = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
fun({Type, NamedConf}) ->
@ -114,102 +102,6 @@ safe_load_connector(Type, Name, Conf) ->
})
end.
% reload_hook(Connectors) ->
% ok = unload_hook(),
% ok = load_hook(Connectors).
% load_hook() ->
% Connectors = emqx:get_config([?ROOT_KEY], #{}),
% load_hook(Connectors).
% load_hook(Connectors) ->
% lists:foreach(
% fun({Type, Connector}) ->
% lists:foreach(
% fun({_Name, ConnectorConf}) ->
% do_load_hook(Type, ConnectorConf)
% end,
% maps:to_list(Connector)
% )
% end,
% maps:to_list(Connectors)
% ).
% do_load_hook(Type, #{local_topic := LocalTopic}) when
% ?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic)
% ->
% emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
% do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
% emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
% do_load_hook(_Type, _Conf) ->
% ok.
% unload_hook() ->
% ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
% on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
% case maps:get(sys, Flags, false) of
% false ->
% {Msg, _} = emqx_rule_events:eventmsg_publish(Message),
% send_to_matched_egress_connectors(Topic, Msg);
% true ->
% ok
% end,
% {ok, Message}.
% send_to_matched_egress_connectors(Topic, Msg) ->
% MatchedConnectorIds = get_matched_egress_connectors(Topic),
% lists:foreach(
% fun(Id) ->
% try send_message(Id, Msg) of
% {error, Reason} ->
% ?SLOG(error, #{
% msg => "send_message_to_connector_failed",
% connector => Id,
% error => Reason
% });
% _ ->
% ok
% catch
% Err:Reason:ST ->
% ?SLOG(error, #{
% msg => "send_message_to_connector_exception",
% connector => Id,
% error => Err,
% reason => Reason,
% stacktrace => ST
% })
% end
% end,
% MatchedConnectorIds
% ).
% send_message(ConnectorId, Message) ->
% {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
% ResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
% send_message(ConnectorType, ConnectorName, ResId, Message, #{}).
% send_message(ConnectorType, ConnectorName, ResId, Message, QueryOpts0) ->
% case emqx:get_config([?ROOT_KEY, ConnectorType, ConnectorName], not_found) of
% not_found ->
% {error, connector_not_found};
% #{enable := true} = Config ->
% QueryOpts = maps:merge(query_opts(Config), QueryOpts0),
% emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
% #{enable := false} ->
% {error, connector_stopped}
% end.
% query_opts(Config) ->
% case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of
% Timeout when is_integer(Timeout) orelse Timeout =:= infinity ->
% %% request_ttl is configured
% #{timeout => Timeout};
% _ ->
% %% emqx_resource has a default value (15s)
% #{}
% end.
config_key_path() ->
[?ROOT_KEY].
@ -240,33 +132,25 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed},
#{
action => fun emqx_connector_resource:create/4,
action => fun emqx_connector_resource:create/3,
data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4
},
#{action => fun emqx_connector_resource:update/4, data => Updated}
]),
% ok = unload_hook(),
% ok = load_hook(NewConf),
?tp(connector_post_config_update_done, #{}),
Result;
post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
ok = emqx_connector_resource:remove(BridgeType, BridgeName),
Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([connectors])),
emqx_connector:reload_hook(Bridges),
post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ok = emqx_connector_resource:remove(Type, Name),
?tp(connector_post_config_update_done, #{}),
ok;
post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
ok = emqx_connector_resource:create(BridgeType, BridgeName, NewConf),
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) ->
ok = emqx_connector_resource:create(Type, Name, NewConf),
?tp(connector_post_config_update_done, #{}),
ok;
post_config_update([connectors, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_connector_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts),
Bridges = emqx_utils_maps:deep_put(
[BridgeType, BridgeName], emqx:get_config([connectors]), NewConf
),
emqx_connector:reload_hook(Bridges),
ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts),
?tp(connector_post_config_update_done, #{}),
ok.
@ -312,13 +196,6 @@ lookup(Type, Name, RawConf) ->
get_metrics(Type, Name) ->
emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)).
% maybe_upgrade(mqtt, Config) ->
% emqx_connector_compatible_config:maybe_upgrade(Config);
% maybe_upgrade(webhook, Config) ->
% emqx_connector_compatible_config:webhook_maybe_upgrade(Config);
% maybe_upgrade(_Other, Config) ->
% Config.
disable_enable(Action, ConnectorType, ConnectorName) when
Action =:= disable; Action =:= enable
->
@ -499,48 +376,6 @@ flatten_confs(Conf0) ->
do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
% get_matched_egress_connectors(Topic) ->
% Connectors = emqx:get_config([connectors], #{}),
% maps:fold(
% fun(BType, Conf, Acc0) ->
% maps:fold(
% fun
% (BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt ->
% get_matched_connector_id(BType, BConf, Topic, BName, Acc1);
% (_BName, #{ingress := _}, Acc1) when BType =:= mqtt ->
% %% ignore ingress only connector
% Acc1;
% (BName, BConf, Acc1) ->
% get_matched_connector_id(BType, BConf, Topic, BName, Acc1)
% end,
% Acc0,
% Conf
% )
% end,
% [],
% Connectors
% ).
% get_matched_connector_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
% Acc;
% get_matched_connector_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) ->
% case maps:get(local_topic, Conf, undefined) of
% undefined ->
% Acc;
% Filter ->
% do_get_matched_connector_id(Topic, Filter, BType, BName, Acc)
% end;
% get_matched_connector_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
% do_get_matched_connector_id(Topic, Filter, mqtt, BName, Acc);
% get_matched_connector_id(_BType, _Conf, _Topic, _BName, Acc) ->
% Acc.
% do_get_matched_connector_id(Topic, Filter, BType, BName, Acc) ->
% case emqx_topic:match(Topic, Filter) of
% true -> [emqx_connector_resource:connector_id(BType, BName) | Acc];
% false -> Acc
% end.
-spec get_basic_usage_info() ->
#{
num_connectors => non_neg_integer(),

View File

@ -32,6 +32,7 @@
-export([
create/3,
create/4,
create_dry_run/2,
recreate/2,
recreate/3,
@ -41,10 +42,10 @@
reset_metrics/1,
restart/2,
start/2,
stop/2
% update/2,
% update/3%,
% update/4
stop/2,
update/2,
update/3,
update/4
]).
-callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) ->
@ -54,20 +55,20 @@ when
-optional_callbacks([connector_config/2]).
%% bi-directional connector with producer/consumer or ingress/egress configs
-define(IS_BI_DIR_BRIDGE(TYPE),
-define(IS_BI_DIR_CONNECTOR(TYPE),
(TYPE) =:= <<"mqtt">>
).
-define(IS_INGRESS_BRIDGE(TYPE),
-define(IS_INGRESS_CONNECTOR(TYPE),
(TYPE) =:= <<"kafka_consumer">> orelse
(TYPE) =:= <<"gcp_pubsub_consumer">> orelse
?IS_BI_DIR_BRIDGE(TYPE)
?IS_BI_DIR_CONNECTOR(TYPE)
).
-if(?EMQX_RELEASE_EDITION == ee).
connector_to_resource_type(ConnectorType) -> emqx_connector_enterprise:resource_type(ConnectorType).
connector_to_resource_type(ConnectorType) -> emqx_connector_ee_schema:resource_type(ConnectorType).
connector_impl_module(ConnectorType) ->
emqx_connector_enterprise:connector_impl_module(ConnectorType).
emqx_connector_ee_schema:connector_impl_module(ConnectorType).
-else.
connector_to_resource_type(_) -> undefined.
@ -168,6 +169,9 @@ start(Type, Name) ->
% create(ConnectorType, ConnectorName, Conf).
create(Type, Name, Conf) ->
create(Type, Name, Conf, #{}).
create(Type, Name, Conf, Opts) ->
?SLOG(info, #{
msg => "create connector",
type => Type,
@ -180,60 +184,60 @@ create(Type, Name, Conf) ->
<<"emqx_connector">>,
connector_to_resource_type(Type),
parse_confs(TypeBin, Name, Conf),
parse_opts(Conf, #{})
parse_opts(Conf, Opts)
),
ok.
% update(ConnectorId, {OldConf, Conf}) ->
% {ConnectorType, ConnectorName} = parse_connector_id(ConnectorId),
% update(ConnectorType, ConnectorName, {OldConf, Conf}).
update(ConnectorId, {OldConf, Conf}) ->
{ConnectorType, ConnectorName} = parse_connector_id(ConnectorId),
update(ConnectorType, ConnectorName, {OldConf, Conf}).
% update(Type, Name, {OldConf, Conf}) ->
% update(Type, Name, {OldConf, Conf}, #{}).
update(Type, Name, {OldConf, Conf}) ->
update(Type, Name, {OldConf, Conf}, #{}).
%update(Type, Name, {OldConf, Conf}, Opts) ->
% %% TODO: sometimes its not necessary to restart the connector connection.
% %%
% %% - if the connection related configs like `servers` is updated, we should restart/start
% %% or stop connectors according to the change.
% %% - if the connection related configs are not update, only non-connection configs like
% %% the `method` or `headers` of a WebHook is changed, then the connector can be updated
% %% without restarting the connector.
% %%
% case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
% false ->
% ?SLOG(info, #{
% msg => "update connector",
% type => Type,
% name => Name,
% config => emqx_utils:redact(Conf)
% }),
% case recreate(Type, Name, Conf, Opts) of
% {ok, _} ->
% ok;
% {error, not_found} ->
% ?SLOG(warning, #{
% msg => "updating_a_non_existing_connector",
% type => Type,
% name => Name,
% config => emqx_utils:redact(Conf)
% }),
% create(Type, Name, Conf, Opts);
% {error, Reason} ->
% {error, {update_connector_failed, Reason}}
% end;
% true ->
% %% we don't need to recreate the connector if this config change is only to
% %% toggole the config 'connector.{type}.{name}.enable'
% _ =
% case maps:get(enable, Conf, true) of
% true ->
% restart(Type, Name);
% false ->
% stop(Type, Name)
% end,
% ok
% end.
update(Type, Name, {OldConf, Conf}, Opts) ->
%% TODO: sometimes its not necessary to restart the connector connection.
%%
%% - if the connection related configs like `servers` is updated, we should restart/start
%% or stop connectors according to the change.
%% - if the connection related configs are not update, only non-connection configs like
%% the `method` or `headers` of a WebHook is changed, then the connector can be updated
%% without restarting the connector.
%%
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
false ->
?SLOG(info, #{
msg => "update connector",
type => Type,
name => Name,
config => emqx_utils:redact(Conf)
}),
case recreate(Type, Name, Conf, Opts) of
{ok, _} ->
ok;
{error, not_found} ->
?SLOG(warning, #{
msg => "updating_a_non_existing_connector",
type => Type,
name => Name,
config => emqx_utils:redact(Conf)
}),
create(Type, Name, Conf, Opts);
{error, Reason} ->
{error, {update_connector_failed, Reason}}
end;
true ->
%% we don't need to recreate the connector if this config change is only to
%% toggole the config 'connector.{type}.{name}.enable'
_ =
case maps:get(enable, Conf, true) of
true ->
restart(Type, Name);
false ->
stop(Type, Name)
end,
ok
end.
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_config([connectors, Type, Name])).
@ -373,7 +377,7 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
Name,
WebhookConfig
);
parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) ->
parse_confs(Type, Name, Conf) when ?IS_INGRESS_CONNECTOR(Type) ->
%% For some drivers that can be used as data-sources, we need to provide a
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
%% receives a message from the external database.

View File

@ -1,7 +1,7 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_enterprise).
-module(emqx_connector_ee_schema).
-if(?EMQX_RELEASE_EDITION == ee).
@ -10,7 +10,6 @@
connector_impl_module/1
]).
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([

View File

@ -30,10 +30,10 @@
enterprise_fields_connectors() ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_connector_enterprise:module_info(),
case erlang:function_exported(emqx_connector_enterprise, fields, 1) of
_ = emqx_connector_ee_schema:module_info(),
case erlang:function_exported(emqx_connector_ee_schema, fields, 1) of
true ->
emqx_connector_enterprise:fields(connectors);
emqx_connector_ee_schema:fields(connectors);
false ->
[]
end.