From 145ff66a9add21898ee6f6805273d72cb57ab58d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 9 Aug 2022 08:58:42 +0800 Subject: [PATCH] fix: issues found by dialyzer and elvis --- apps/emqx_resource/include/emqx_resource.hrl | 3 +- apps/emqx_resource/src/emqx_resource.erl | 8 ++++- .../src/emqx_resource_manager.erl | 5 ++-- .../src/emqx_ee_connector_hstreamdb.erl | 29 +++++++++---------- .../src/emqx_ee_connector_influxdb.erl | 17 ++++++----- 5 files changed, 35 insertions(+), 27 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index c691789c2..a59877a30 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,10 +22,11 @@ -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. +-type callback_mode() :: always_sync | async_if_possible. -type resource_data() :: #{ id := resource_id(), mod := module(), - callback_mode := always_sync | async_if_possible, + callback_mode := callback_mode(), config := resource_config(), state := resource_state(), status := resource_status(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a54a77e19..d17f4ce19 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -80,8 +80,10 @@ %% Direct calls to the callback module -%% start the instance -export([ + %% get the callback mode of a specific module + get_callback_mode/1, + %% start the instance call_start/3, %% verify if the resource is working normally call_health_check/3, @@ -285,6 +287,10 @@ generate_id(Name) when is_binary(Name) -> -spec list_group_instances(resource_group()) -> [resource_id()]. list_group_instances(Group) -> emqx_resource_manager:list_group(Group). +-spec get_callback_mode(module()) -> callback_mode(). +get_callback_mode(Mod) -> + Mod:callback_mode(). + -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d6e5a1493..3310555d1 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -54,6 +54,7 @@ % State record -record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}). +-type data() :: #data{}. -define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(HEALTHCHECK_INTERVAL, 15000). @@ -259,7 +260,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> manager_id = MgrId, group = Group, mod = ResourceType, - callback_mode = ResourceType:callback_mode(), + callback_mode = emqx_resource:get_callback_mode(ResourceType), config = Config, opts = Opts, status = connecting, @@ -560,7 +561,7 @@ maybe_reply(Actions, undefined, _Reply) -> maybe_reply(Actions, From, Reply) -> [{reply, From, Reply} | Actions]. --spec data_record_to_external_map_with_metrics(#data{}) -> resource_data(). +-spec data_record_to_external_map_with_metrics(data()) -> resource_data(). data_record_to_external_map_with_metrics(Data) -> #{ id => Data#data.id, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl index 8ee37cd8a..3892b7fc0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl @@ -13,9 +13,10 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -33,6 +34,7 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +callback_mode() -> always_sync. on_start(InstId, Config) -> start_client(InstId, Config). @@ -52,11 +54,10 @@ on_stop(InstId, #{client := Client, producer := Producer}) -> on_query( _InstId, {send_message, Data}, - AfterQuery, #{producer := Producer, ordering_key := OrderingKey, payload := Payload} ) -> Record = to_record(OrderingKey, Payload, Data), - do_append(AfterQuery, Producer, Record). + do_append(Producer, Record). on_get_status(_InstId, #{client := Client}) -> case is_alive(Client) of @@ -260,27 +261,26 @@ to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> to_record(OrderingKey, Payload) -> hstreamdb:to_record(OrderingKey, raw, Payload). -do_append(AfterQuery, Producer, Record) -> - do_append(AfterQuery, false, Producer, Record). +do_append(Producer, Record) -> + do_append(false, Producer, Record). %% TODO: this append is async, remove or change it after we have better disk cache. -% do_append(AfterQuery, true, Producer, Record) -> +% do_append(true, Producer, Record) -> % case hstreamdb:append(Producer, Record) of % ok -> % ?SLOG(debug, #{ % msg => "hstreamdb producer async append success", % record => Record -% }), -% emqx_resource:query_success(AfterQuery); -% {error, Reason} -> +% }); +% {error, Reason} = Err -> % ?SLOG(error, #{ % msg => "hstreamdb producer async append failed", % reason => Reason, % record => Record % }), -% emqx_resource:query_failed(AfterQuery) +% Err % end; -do_append(AfterQuery, false, Producer, Record) -> +do_append(false, Producer, Record) -> %% TODO: this append is sync, but it does not support [Record], can only append one Record. %% Change it after we have better dick cache. case hstreamdb:append_flush(Producer, Record) of @@ -288,15 +288,14 @@ do_append(AfterQuery, false, Producer, Record) -> ?SLOG(debug, #{ msg => "hstreamdb producer sync append success", record => Record - }), - emqx_resource:query_success(AfterQuery); - {error, Reason} -> + }); + {error, Reason} = Err -> ?SLOG(error, #{ msg => "hstreamdb producer sync append failed", reason => Reason, record => Record }), - emqx_resource:query_failed(AfterQuery) + Err end. client_name(InstId) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 9582f1729..09b3d7350 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -13,9 +13,10 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, - on_query/4, + on_query/3, on_get_status/2 ]). @@ -28,6 +29,7 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +callback_mode() -> always_sync. on_start(InstId, Config) -> start_client(InstId, Config). @@ -35,8 +37,8 @@ on_start(InstId, Config) -> on_stop(_InstId, #{client := Client}) -> influxdb:stop_client(Client). -on_query(InstId, {send_message, Data}, AfterQuery, State) -> - do_query(InstId, {send_message, Data}, AfterQuery, State). +on_query(InstId, {send_message, Data}, State) -> + do_query(InstId, {send_message, Data}, State). on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of @@ -308,7 +310,7 @@ ssl_config(SSL = #{enable := true}) -> %% ------------------------------------------------------------------------------------------------- %% Query -do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> +do_query(InstId, {send_message, Data}, State = #{client := Client}) -> {Points, Errs} = data_to_points(Data, State), lists:foreach( fun({error, Reason}) -> @@ -326,15 +328,14 @@ do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) msg => "influxdb write point success", connector => InstId, points => Points - }), - emqx_resource:query_success(AfterQuery); - {error, Reason} -> + }); + {error, Reason} = Err -> ?SLOG(error, #{ msg => "influxdb write point failed", connector => InstId, reason => Reason }), - emqx_resource:query_failed(AfterQuery) + Err end. %% -------------------------------------------------------------------------------------------------