fix: problems reported by dialyzer

This commit is contained in:
Kjell Winblad 2023-10-24 09:35:58 +02:00 committed by Zaiming (Stone) Shi
parent 7822d7db76
commit e72b952138
8 changed files with 56 additions and 47 deletions

View File

@ -441,20 +441,20 @@ create_dry_run(Type, Conf0) ->
emqx_bridge_v2_schema,
RawConf,
#{atom_key => true, required => false}
)
),
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end
catch
%% validation errors
throw:Reason1 ->
{error, Reason1}
end,
#{<<"connector">> := ConnectorName} = Conf1,
%% Check that the connector exists and do the dry run if it exists
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
case emqx:get_raw_config([connectors, ConnectorType, ConnectorName], not_found) of
not_found ->
{error, iolist_to_binary(io_lib:format("Connector ~p not found", [ConnectorName]))};
ConnectorRawConf ->
create_dry_run_helper(Type, ConnectorRawConf, Conf1)
end.
get_metrics(Type, Name) ->
@ -874,8 +874,21 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
{ok, _} = Result ->
Result;
Error ->
emqx_connector:remove(ConnectorType, ConnectorNameAtom),
Error
case emqx_connector:remove(ConnectorType, ConnectorNameAtom) of
{ok, _} ->
Error;
Error ->
%% TODO log error
?SLOG(warning, #{
message =>
<<"Failed to remove connector after bridge creation failed">>,
bridge_version => 2,
bridge_type => BridgeType,
bridge_name => BridgeName,
bridge_raw_config => emqx_utils:redact(RawConf)
}),
Error
end
end;
Error ->
Error
@ -999,7 +1012,7 @@ bridge_v1_check_deps_and_remove(
BridgeType,
BridgeName,
RemoveDeps,
#{connector := ConnectorName}
#{connector := ConnectorName} = Conf
) ->
case check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) of
{error, _} = Error ->
@ -1009,7 +1022,20 @@ bridge_v1_check_deps_and_remove(
case connector_has_channels(BridgeType, ConnectorName) of
false ->
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType),
emqx_connector:remove(ConnectorType, ConnectorName);
case emqx_connector:remove(ConnectorType, ConnectorName) of
{ok, _} ->
ok;
Error ->
?SLOG(warning, #{
message => <<"Failed to remove connector after bridge removal">>,
bridge_version => 2,
bridge_type => BridgeType,
bridge_name => BridgeName,
error => Error,
bridge_raw_config => emqx_utils:redact(Conf)
}),
ok
end;
true ->
ok
end,

View File

@ -70,15 +70,8 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
% {<<"webhook">>, emqx_bridge_http_schema},
% {<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
hoconsc:union(bridge_api_union(EE)).
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),

View File

@ -219,7 +219,8 @@ deallocate_client(ClientId) ->
msg => "failed_to_delete_kafka_client",
client_id => ClientId
}
).
),
ok.
deallocate_producers(ClientId, Producers) ->
_ = with_log_at_error(

View File

@ -296,11 +296,7 @@ remove(Type, Name) ->
%% just for perform_connector_changes/1
remove(Type, Name, _Conf, _Opts) ->
?SLOG(info, #{msg => "remove_connector", type => Type, name => Name}),
case emqx_resource:remove_local(resource_id(Type, Name)) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} -> {error, Reason}
end.
emqx_resource:remove_local(resource_id(Type, Name)).
%% convert connector configs to what the connector modules want
parse_confs(

View File

@ -222,15 +222,8 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
%% {<<"webhook">>, emqx_bridge_http_schema},
%% {<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
hoconsc:union(bridge_api_union(EE)).
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),

View File

@ -44,7 +44,9 @@
expire_at => infinity | integer(),
async_reply_fun => reply_fun(),
simple_query => boolean(),
reply_to => reply_fun()
reply_to => reply_fun(),
query_mode => query_mode(),
query_mode_cache_override => boolean()
}.
-type resource_data() :: #{
id := resource_id(),
@ -54,7 +56,8 @@
config := resource_config(),
error := term(),
state := resource_state(),
status := resource_status()
status := resource_status(),
added_channels := term()
}.
-type resource_group() :: binary().
-type creation_opts() :: #{

View File

@ -230,7 +230,7 @@
%% for the resource with the given id.
-callback on_get_channels(
ResId :: term()
) -> {ok, [term()]}.
) -> [term()].
-spec list_types() -> [module()].
list_types() ->
@ -337,8 +337,7 @@ remove_local(ResId) ->
resource_id => ResId
}),
ok
end,
ok.
end.
-spec reset_metrics_local(resource_id()) -> ok.
reset_metrics_local(ResId) ->
@ -412,7 +411,7 @@ get_query_mode_error(ResId, Opts) ->
{ok, _Group, #{query_mode := QM, error := Error}} ->
{QM, Error};
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
{error, not_found}
end
end.
@ -449,7 +448,7 @@ health_check(ResId) ->
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]}.
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}.
get_channels(ResId) ->
emqx_resource_manager:get_channels(ResId).

View File

@ -675,8 +675,7 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
state = NewState,
added_channels = NewAddedChannelsMap
},
update_state(UpdatedData, Data),
{ok, UpdatedData};
{ok, update_state(UpdatedData, Data)};
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
@ -712,8 +711,7 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
state = NewState,
added_channels = NewAddedChannelsMap
},
update_state(UpdatedData, Data),
{keep_state, UpdatedData, [{reply, From, ok}]};
{keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]};
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{