diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index f38c5563a..bf11c17e8 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -19,7 +19,7 @@ -include("logger.hrl"). -include("emqx_schema.hrl"). --include_lib("hocon/include/hoconsc.hrl"). +-include_lib("hocon/include/hocon_types.hrl"). -behaviour(gen_server). @@ -736,7 +736,7 @@ remove_empty_leaf(KeyPath, Handlers) -> end. assert_callback_function(Mod) -> - _ = Mod:module_info(), + _ = apply(Mod, module_info, []), case erlang:function_exported(Mod, pre_config_update, 3) orelse erlang:function_exported(Mod, post_config_update, 5) diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 579ab1568..a5daacafb 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -18,9 +18,7 @@ -behaviour(emqx_config_handler). -behaviour(emqx_config_backup). --include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ @@ -29,25 +27,16 @@ ]). -export([ - % load_hook/0, - % unload_hook/0 -]). - -% -export([on_message_publish/1]). - --export([ - load/0, - unload/0, - lookup/1, - lookup/2, - get_metrics/2, + check_deps_and_remove/3, create/3, disable_enable/3, + get_metrics/2, + list/0, + load/0, + lookup/1, + lookup/2, remove/2, - check_deps_and_remove/3, - list/0 - % , - % reload_hook/1 + unload/0 ]). -export([config_key_path/0]). @@ -77,7 +66,6 @@ load() -> ). unload() -> - %% unload_hook(), Connectors = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( fun({Type, NamedConf}) -> @@ -114,102 +102,6 @@ safe_load_connector(Type, Name, Conf) -> }) end. -% reload_hook(Connectors) -> -% ok = unload_hook(), -% ok = load_hook(Connectors). - -% load_hook() -> -% Connectors = emqx:get_config([?ROOT_KEY], #{}), -% load_hook(Connectors). - -% load_hook(Connectors) -> -% lists:foreach( -% fun({Type, Connector}) -> -% lists:foreach( -% fun({_Name, ConnectorConf}) -> -% do_load_hook(Type, ConnectorConf) -% end, -% maps:to_list(Connector) -% ) -% end, -% maps:to_list(Connectors) -% ). - -% do_load_hook(Type, #{local_topic := LocalTopic}) when -% ?EGRESS_DIR_BRIDGES(Type) andalso is_binary(LocalTopic) -% -> -% emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); -% do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) -> -% emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); -% do_load_hook(_Type, _Conf) -> -% ok. - -% unload_hook() -> -% ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). - -% on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> -% case maps:get(sys, Flags, false) of -% false -> -% {Msg, _} = emqx_rule_events:eventmsg_publish(Message), -% send_to_matched_egress_connectors(Topic, Msg); -% true -> -% ok -% end, -% {ok, Message}. - -% send_to_matched_egress_connectors(Topic, Msg) -> -% MatchedConnectorIds = get_matched_egress_connectors(Topic), -% lists:foreach( -% fun(Id) -> -% try send_message(Id, Msg) of -% {error, Reason} -> -% ?SLOG(error, #{ -% msg => "send_message_to_connector_failed", -% connector => Id, -% error => Reason -% }); -% _ -> -% ok -% catch -% Err:Reason:ST -> -% ?SLOG(error, #{ -% msg => "send_message_to_connector_exception", -% connector => Id, -% error => Err, -% reason => Reason, -% stacktrace => ST -% }) -% end -% end, -% MatchedConnectorIds -% ). - -% send_message(ConnectorId, Message) -> -% {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), -% ResId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), -% send_message(ConnectorType, ConnectorName, ResId, Message, #{}). - -% send_message(ConnectorType, ConnectorName, ResId, Message, QueryOpts0) -> -% case emqx:get_config([?ROOT_KEY, ConnectorType, ConnectorName], not_found) of -% not_found -> -% {error, connector_not_found}; -% #{enable := true} = Config -> -% QueryOpts = maps:merge(query_opts(Config), QueryOpts0), -% emqx_resource:query(ResId, {send_message, Message}, QueryOpts); -% #{enable := false} -> -% {error, connector_stopped} -% end. - -% query_opts(Config) -> -% case emqx_utils_maps:deep_get([resource_opts, request_ttl], Config, false) of -% Timeout when is_integer(Timeout) orelse Timeout =:= infinity -> -% %% request_ttl is configured -% #{timeout => Timeout}; -% _ -> -% %% emqx_resource has a default value (15s) -% #{} -% end. - config_key_path() -> [?ROOT_KEY]. @@ -240,33 +132,25 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> Result = perform_connector_changes([ #{action => fun emqx_connector_resource:remove/4, data => Removed}, #{ - action => fun emqx_connector_resource:create/4, + action => fun emqx_connector_resource:create/3, data => Added, on_exception_fn => fun emqx_connector_resource:remove/4 }, #{action => fun emqx_connector_resource:update/4, data => Updated} ]), - % ok = unload_hook(), - % ok = load_hook(NewConf), ?tp(connector_post_config_update_done, #{}), Result; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> - ok = emqx_connector_resource:remove(BridgeType, BridgeName), - Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([connectors])), - emqx_connector:reload_hook(Bridges), +post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> + ok = emqx_connector_resource:remove(Type, Name), ?tp(connector_post_config_update_done, #{}), ok; -post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> - ok = emqx_connector_resource:create(BridgeType, BridgeName, NewConf), +post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> + ok = emqx_connector_resource:create(Type, Name, NewConf), ?tp(connector_post_config_update_done, #{}), ok; -post_config_update([connectors, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> +post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ResOpts = emqx_resource:fetch_creation_opts(NewConf), - ok = emqx_connector_resource:update(BridgeType, BridgeName, {OldConf, NewConf}, ResOpts), - Bridges = emqx_utils_maps:deep_put( - [BridgeType, BridgeName], emqx:get_config([connectors]), NewConf - ), - emqx_connector:reload_hook(Bridges), + ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts), ?tp(connector_post_config_update_done, #{}), ok. @@ -312,13 +196,6 @@ lookup(Type, Name, RawConf) -> get_metrics(Type, Name) -> emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)). -% maybe_upgrade(mqtt, Config) -> -% emqx_connector_compatible_config:maybe_upgrade(Config); -% maybe_upgrade(webhook, Config) -> -% emqx_connector_compatible_config:webhook_maybe_upgrade(Config); -% maybe_upgrade(_Other, Config) -> -% Config. - disable_enable(Action, ConnectorType, ConnectorName) when Action =:= disable; Action =:= enable -> @@ -499,48 +376,6 @@ flatten_confs(Conf0) -> do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. -% get_matched_egress_connectors(Topic) -> -% Connectors = emqx:get_config([connectors], #{}), -% maps:fold( -% fun(BType, Conf, Acc0) -> -% maps:fold( -% fun -% (BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt -> -% get_matched_connector_id(BType, BConf, Topic, BName, Acc1); -% (_BName, #{ingress := _}, Acc1) when BType =:= mqtt -> -% %% ignore ingress only connector -% Acc1; -% (BName, BConf, Acc1) -> -% get_matched_connector_id(BType, BConf, Topic, BName, Acc1) -% end, -% Acc0, -% Conf -% ) -% end, -% [], -% Connectors -% ). - -% get_matched_connector_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> -% Acc; -% get_matched_connector_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) -> -% case maps:get(local_topic, Conf, undefined) of -% undefined -> -% Acc; -% Filter -> -% do_get_matched_connector_id(Topic, Filter, BType, BName, Acc) -% end; -% get_matched_connector_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) -> -% do_get_matched_connector_id(Topic, Filter, mqtt, BName, Acc); -% get_matched_connector_id(_BType, _Conf, _Topic, _BName, Acc) -> -% Acc. - -% do_get_matched_connector_id(Topic, Filter, BType, BName, Acc) -> -% case emqx_topic:match(Topic, Filter) of -% true -> [emqx_connector_resource:connector_id(BType, BName) | Acc]; -% false -> Acc -% end. - -spec get_basic_usage_info() -> #{ num_connectors => non_neg_integer(), diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 0a67cf740..561ae0085 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -32,6 +32,7 @@ -export([ create/3, + create/4, create_dry_run/2, recreate/2, recreate/3, @@ -41,10 +42,10 @@ reset_metrics/1, restart/2, start/2, - stop/2 - % update/2, - % update/3%, - % update/4 + stop/2, + update/2, + update/3, + update/4 ]). -callback connector_config(ParsedConfig, ConnectorName :: atom() | binary()) -> @@ -54,20 +55,20 @@ when -optional_callbacks([connector_config/2]). %% bi-directional connector with producer/consumer or ingress/egress configs --define(IS_BI_DIR_BRIDGE(TYPE), +-define(IS_BI_DIR_CONNECTOR(TYPE), (TYPE) =:= <<"mqtt">> ). --define(IS_INGRESS_BRIDGE(TYPE), +-define(IS_INGRESS_CONNECTOR(TYPE), (TYPE) =:= <<"kafka_consumer">> orelse (TYPE) =:= <<"gcp_pubsub_consumer">> orelse - ?IS_BI_DIR_BRIDGE(TYPE) + ?IS_BI_DIR_CONNECTOR(TYPE) ). -if(?EMQX_RELEASE_EDITION == ee). -connector_to_resource_type(ConnectorType) -> emqx_connector_enterprise:resource_type(ConnectorType). +connector_to_resource_type(ConnectorType) -> emqx_connector_ee_schema:resource_type(ConnectorType). connector_impl_module(ConnectorType) -> - emqx_connector_enterprise:connector_impl_module(ConnectorType). + emqx_connector_ee_schema:connector_impl_module(ConnectorType). -else. connector_to_resource_type(_) -> undefined. @@ -168,6 +169,9 @@ start(Type, Name) -> % create(ConnectorType, ConnectorName, Conf). create(Type, Name, Conf) -> + create(Type, Name, Conf, #{}). + +create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ msg => "create connector", type => Type, @@ -180,60 +184,60 @@ create(Type, Name, Conf) -> <<"emqx_connector">>, connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), - parse_opts(Conf, #{}) + parse_opts(Conf, Opts) ), ok. -% update(ConnectorId, {OldConf, Conf}) -> -% {ConnectorType, ConnectorName} = parse_connector_id(ConnectorId), -% update(ConnectorType, ConnectorName, {OldConf, Conf}). +update(ConnectorId, {OldConf, Conf}) -> + {ConnectorType, ConnectorName} = parse_connector_id(ConnectorId), + update(ConnectorType, ConnectorName, {OldConf, Conf}). -% update(Type, Name, {OldConf, Conf}) -> -% update(Type, Name, {OldConf, Conf}, #{}). +update(Type, Name, {OldConf, Conf}) -> + update(Type, Name, {OldConf, Conf}, #{}). -%update(Type, Name, {OldConf, Conf}, Opts) -> -% %% TODO: sometimes its not necessary to restart the connector connection. -% %% -% %% - if the connection related configs like `servers` is updated, we should restart/start -% %% or stop connectors according to the change. -% %% - if the connection related configs are not update, only non-connection configs like -% %% the `method` or `headers` of a WebHook is changed, then the connector can be updated -% %% without restarting the connector. -% %% -% case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of -% false -> -% ?SLOG(info, #{ -% msg => "update connector", -% type => Type, -% name => Name, -% config => emqx_utils:redact(Conf) -% }), -% case recreate(Type, Name, Conf, Opts) of -% {ok, _} -> -% ok; -% {error, not_found} -> -% ?SLOG(warning, #{ -% msg => "updating_a_non_existing_connector", -% type => Type, -% name => Name, -% config => emqx_utils:redact(Conf) -% }), -% create(Type, Name, Conf, Opts); -% {error, Reason} -> -% {error, {update_connector_failed, Reason}} -% end; -% true -> -% %% we don't need to recreate the connector if this config change is only to -% %% toggole the config 'connector.{type}.{name}.enable' -% _ = -% case maps:get(enable, Conf, true) of -% true -> -% restart(Type, Name); -% false -> -% stop(Type, Name) -% end, -% ok -% end. +update(Type, Name, {OldConf, Conf}, Opts) -> + %% TODO: sometimes its not necessary to restart the connector connection. + %% + %% - if the connection related configs like `servers` is updated, we should restart/start + %% or stop connectors according to the change. + %% - if the connection related configs are not update, only non-connection configs like + %% the `method` or `headers` of a WebHook is changed, then the connector can be updated + %% without restarting the connector. + %% + case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of + false -> + ?SLOG(info, #{ + msg => "update connector", + type => Type, + name => Name, + config => emqx_utils:redact(Conf) + }), + case recreate(Type, Name, Conf, Opts) of + {ok, _} -> + ok; + {error, not_found} -> + ?SLOG(warning, #{ + msg => "updating_a_non_existing_connector", + type => Type, + name => Name, + config => emqx_utils:redact(Conf) + }), + create(Type, Name, Conf, Opts); + {error, Reason} -> + {error, {update_connector_failed, Reason}} + end; + true -> + %% we don't need to recreate the connector if this config change is only to + %% toggole the config 'connector.{type}.{name}.enable' + _ = + case maps:get(enable, Conf, true) of + true -> + restart(Type, Name); + false -> + stop(Type, Name) + end, + ok + end. recreate(Type, Name) -> recreate(Type, Name, emqx:get_config([connectors, Type, Name])). @@ -373,7 +377,7 @@ parse_confs(<<"iotdb">>, Name, Conf) -> Name, WebhookConfig ); -parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> +parse_confs(Type, Name, Conf) when ?IS_INGRESS_CONNECTOR(Type) -> %% For some drivers that can be used as data-sources, we need to provide a %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. diff --git a/apps/emqx_connector/src/schema/emqx_connector_enterprise.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl similarity index 93% rename from apps/emqx_connector/src/schema/emqx_connector_enterprise.erl rename to apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index a4441bd07..0bfd7e602 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_enterprise.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_connector_enterprise). +-module(emqx_connector_ee_schema). -if(?EMQX_RELEASE_EDITION == ee). @@ -10,7 +10,6 @@ connector_impl_module/1 ]). --include_lib("hocon/include/hoconsc.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 59278da24..954235a2a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -30,10 +30,10 @@ enterprise_fields_connectors() -> %% We *must* do this to ensure the module is really loaded, especially when we use %% `call_hocon' from `nodetool' to generate initial configurations. - _ = emqx_connector_enterprise:module_info(), - case erlang:function_exported(emqx_connector_enterprise, fields, 1) of + _ = emqx_connector_ee_schema:module_info(), + case erlang:function_exported(emqx_connector_ee_schema, fields, 1) of true -> - emqx_connector_enterprise:fields(connectors); + emqx_connector_ee_schema:fields(connectors); false -> [] end.