From e72b952138236518291a6e3ce40eb5d028022252 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 24 Oct 2023 09:35:58 +0200 Subject: [PATCH] fix: problems reported by dialyzer --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 54 ++++++++++++++----- .../src/schema/emqx_bridge_v2_schema.erl | 9 +--- .../src/emqx_bridge_kafka_impl_producer.erl | 3 +- .../src/emqx_connector_resource.erl | 6 +-- .../src/schema/emqx_connector_schema.erl | 9 +--- apps/emqx_resource/include/emqx_resource.hrl | 7 ++- apps/emqx_resource/src/emqx_resource.erl | 9 ++-- .../src/emqx_resource_manager.erl | 6 +-- 8 files changed, 56 insertions(+), 47 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 43609dc27..4fbc3ab2b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -441,20 +441,20 @@ create_dry_run(Type, Conf0) -> emqx_bridge_v2_schema, RawConf, #{atom_key => true, required => false} - ) + ), + #{<<"connector">> := ConnectorName} = Conf1, + %% Check that the connector exists and do the dry run if it exists + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), + case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of + not_found -> + {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; + ConnectorRawConf -> + create_dry_run_helper(Type, ConnectorRawConf, Conf1) + end catch %% validation errors throw:Reason1 -> {error, Reason1} - end, - #{<<"connector">> := ConnectorName} = Conf1, - %% Check that the connector exists and do the dry run if it exists - ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type), - case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of - not_found -> - {error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))}; - ConnectorRawConf -> - create_dry_run_helper(Type, ConnectorRawConf, Conf1) end. get_metrics(Type, Name) -> @@ -874,8 +874,21 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) -> {ok, _} = Result -> Result; Error -> - emqx_connector:remove(ConnectorType, ConnectorNameAtom), - Error + case emqx_connector:remove(ConnectorType, ConnectorNameAtom) of + {ok, _} -> + Error; + Error -> + %% TODO log error + ?SLOG(warning, #{ + message => + <<"Failed to remove connector after bridge creation failed">>, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName, + bridge_raw_config => emqx_utils:redact(RawConf) + }), + Error + end end; Error -> Error @@ -999,7 +1012,7 @@ bridge_v1_check_deps_and_remove( BridgeType, BridgeName, RemoveDeps, - #{connector := ConnectorName} + #{connector := ConnectorName} = Conf ) -> case check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) of {error, _} = Error -> @@ -1009,7 +1022,20 @@ bridge_v1_check_deps_and_remove( case connector_has_channels(BridgeType, ConnectorName) of false -> ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType), - emqx_connector:remove(ConnectorType, ConnectorName); + case emqx_connector:remove(ConnectorType, ConnectorName) of + {ok, _} -> + ok; + Error -> + ?SLOG(warning, #{ + message => <<"Failed to remove connector after bridge removal">>, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName, + error => Error, + bridge_raw_config => emqx_utils:redact(Conf) + }), + ok + end; true -> ok end, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d6d9cec1d..9903c75e8 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -70,15 +70,8 @@ post_request() -> api_schema("post"). api_schema(Method) -> - Broker = [ - {Type, ref(Mod, Method)} - || {Type, Mod} <- [ - % {<<"webhook">>, emqx_bridge_http_schema}, - % {<<"mqtt">>, emqx_bridge_mqtt_schema} - ] - ], EE = enterprise_api_schemas(Method), - hoconsc:union(bridge_api_union(Broker ++ EE)). + hoconsc:union(bridge_api_union(EE)). bridge_api_union(Refs) -> Index = maps:from_list(Refs), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 1b19782a5..e43a872f4 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -219,7 +219,8 @@ deallocate_client(ClientId) -> msg => "failed_to_delete_kafka_client", client_id => ClientId } - ). + ), + ok. deallocate_producers(ClientId, Producers) -> _ = with_log_at_error( diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 2fd440e59..1e7f32712 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -296,11 +296,7 @@ remove(Type, Name) -> %% just for perform_connector_changes/1 remove(Type, Name, _Conf, _Opts) -> ?SLOG(info, #{msg => "remove_connector", type => Type, name => Name}), - case emqx_resource:remove_local(resource_id(Type, Name)) of - ok -> ok; - {error, not_found} -> ok; - {error, Reason} -> {error, Reason} - end. + emqx_resource:remove_local(resource_id(Type, Name)). %% convert connector configs to what the connector modules want parse_confs( diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 50c83b6fc..2afa04ccf 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -222,15 +222,8 @@ post_request() -> api_schema("post"). api_schema(Method) -> - Broker = [ - {Type, ref(Mod, Method)} - || {Type, Mod} <- [ - %% {<<"webhook">>, emqx_bridge_http_schema}, - %% {<<"mqtt">>, emqx_bridge_mqtt_schema} - ] - ], EE = enterprise_api_schemas(Method), - hoconsc:union(bridge_api_union(Broker ++ EE)). + hoconsc:union(bridge_api_union(EE)). bridge_api_union(Refs) -> Index = maps:from_list(Refs), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 031fb669e..30f936f8c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -44,7 +44,9 @@ expire_at => infinity | integer(), async_reply_fun => reply_fun(), simple_query => boolean(), - reply_to => reply_fun() + reply_to => reply_fun(), + query_mode => query_mode(), + query_mode_cache_override => boolean() }. -type resource_data() :: #{ id := resource_id(), @@ -54,7 +56,8 @@ config := resource_config(), error := term(), state := resource_state(), - status := resource_status() + status := resource_status(), + added_channels := term() }. -type resource_group() :: binary(). -type creation_opts() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index af52cf17b..ae5e78cd4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -230,7 +230,7 @@ %% for the resource with the given id. -callback on_get_channels( ResId :: term() -) -> {ok, [term()]}. +) -> [term()]. -spec list_types() -> [module()]. list_types() -> @@ -337,8 +337,7 @@ remove_local(ResId) -> resource_id => ResId }), ok - end, - ok. + end. -spec reset_metrics_local(resource_id()) -> ok. reset_metrics_local(ResId) -> @@ -412,7 +411,7 @@ get_query_mode_error(ResId, Opts) -> {ok, _Group, #{query_mode := QM, error := Error}} -> {QM, Error}; {error, not_found} -> - ?RESOURCE_ERROR(not_found, "resource not found") + {error, not_found} end end. @@ -449,7 +448,7 @@ health_check(ResId) -> channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). --spec get_channels(resource_id()) -> {ok, [{binary(), map()}]}. +-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}. get_channels(ResId) -> emqx_resource_manager:get_channels(ResId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index b8023e6ee..3f0d51b5b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -675,8 +675,7 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> state = NewState, added_channels = NewAddedChannelsMap }, - update_state(UpdatedData, Data), - {ok, UpdatedData}; + {ok, update_state(UpdatedData, Data)}; {error, Reason} = Error -> %% Log the error as a warning ?SLOG(warning, #{ @@ -712,8 +711,7 @@ handle_remove_channel_exists(From, ChannelId, Data) -> state = NewState, added_channels = NewAddedChannelsMap }, - update_state(UpdatedData, Data), - {keep_state, UpdatedData, [{reply, From, ok}]}; + {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {error, Reason} = Error -> %% Log the error as a warning ?SLOG(warning, #{