fix: issues found by dialyzer and elvis
This commit is contained in:
parent
35fe70b887
commit
145ff66a9a
|
@ -22,10 +22,11 @@
|
||||||
-type resource_spec() :: map().
|
-type resource_spec() :: map().
|
||||||
-type resource_state() :: term().
|
-type resource_state() :: term().
|
||||||
-type resource_status() :: connected | disconnected | connecting | stopped.
|
-type resource_status() :: connected | disconnected | connecting | stopped.
|
||||||
|
-type callback_mode() :: always_sync | async_if_possible.
|
||||||
-type resource_data() :: #{
|
-type resource_data() :: #{
|
||||||
id := resource_id(),
|
id := resource_id(),
|
||||||
mod := module(),
|
mod := module(),
|
||||||
callback_mode := always_sync | async_if_possible,
|
callback_mode := callback_mode(),
|
||||||
config := resource_config(),
|
config := resource_config(),
|
||||||
state := resource_state(),
|
state := resource_state(),
|
||||||
status := resource_status(),
|
status := resource_status(),
|
||||||
|
|
|
@ -80,8 +80,10 @@
|
||||||
|
|
||||||
%% Direct calls to the callback module
|
%% Direct calls to the callback module
|
||||||
|
|
||||||
%% start the instance
|
|
||||||
-export([
|
-export([
|
||||||
|
%% get the callback mode of a specific module
|
||||||
|
get_callback_mode/1,
|
||||||
|
%% start the instance
|
||||||
call_start/3,
|
call_start/3,
|
||||||
%% verify if the resource is working normally
|
%% verify if the resource is working normally
|
||||||
call_health_check/3,
|
call_health_check/3,
|
||||||
|
@ -285,6 +287,10 @@ generate_id(Name) when is_binary(Name) ->
|
||||||
-spec list_group_instances(resource_group()) -> [resource_id()].
|
-spec list_group_instances(resource_group()) -> [resource_id()].
|
||||||
list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
|
list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
|
||||||
|
|
||||||
|
-spec get_callback_mode(module()) -> callback_mode().
|
||||||
|
get_callback_mode(Mod) ->
|
||||||
|
Mod:callback_mode().
|
||||||
|
|
||||||
-spec call_start(manager_id(), module(), resource_config()) ->
|
-spec call_start(manager_id(), module(), resource_config()) ->
|
||||||
{ok, resource_state()} | {error, Reason :: term()}.
|
{ok, resource_state()} | {error, Reason :: term()}.
|
||||||
call_start(MgrId, Mod, Config) ->
|
call_start(MgrId, Mod, Config) ->
|
||||||
|
|
|
@ -54,6 +54,7 @@
|
||||||
|
|
||||||
% State record
|
% State record
|
||||||
-record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
|
-record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
|
||||||
|
-type data() :: #data{}.
|
||||||
|
|
||||||
-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
|
-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
|
||||||
-define(HEALTHCHECK_INTERVAL, 15000).
|
-define(HEALTHCHECK_INTERVAL, 15000).
|
||||||
|
@ -259,7 +260,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
||||||
manager_id = MgrId,
|
manager_id = MgrId,
|
||||||
group = Group,
|
group = Group,
|
||||||
mod = ResourceType,
|
mod = ResourceType,
|
||||||
callback_mode = ResourceType:callback_mode(),
|
callback_mode = emqx_resource:get_callback_mode(ResourceType),
|
||||||
config = Config,
|
config = Config,
|
||||||
opts = Opts,
|
opts = Opts,
|
||||||
status = connecting,
|
status = connecting,
|
||||||
|
@ -560,7 +561,7 @@ maybe_reply(Actions, undefined, _Reply) ->
|
||||||
maybe_reply(Actions, From, Reply) ->
|
maybe_reply(Actions, From, Reply) ->
|
||||||
[{reply, From, Reply} | Actions].
|
[{reply, From, Reply} | Actions].
|
||||||
|
|
||||||
-spec data_record_to_external_map_with_metrics(#data{}) -> resource_data().
|
-spec data_record_to_external_map_with_metrics(data()) -> resource_data().
|
||||||
data_record_to_external_map_with_metrics(Data) ->
|
data_record_to_external_map_with_metrics(Data) ->
|
||||||
#{
|
#{
|
||||||
id => Data#data.id,
|
id => Data#data.id,
|
||||||
|
|
|
@ -13,9 +13,10 @@
|
||||||
|
|
||||||
%% callbacks of behaviour emqx_resource
|
%% callbacks of behaviour emqx_resource
|
||||||
-export([
|
-export([
|
||||||
|
callback_mode/0,
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/4,
|
on_query/3,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -33,6 +34,7 @@
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% resource callback
|
%% resource callback
|
||||||
|
callback_mode() -> always_sync.
|
||||||
|
|
||||||
on_start(InstId, Config) ->
|
on_start(InstId, Config) ->
|
||||||
start_client(InstId, Config).
|
start_client(InstId, Config).
|
||||||
|
@ -52,11 +54,10 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
|
||||||
on_query(
|
on_query(
|
||||||
_InstId,
|
_InstId,
|
||||||
{send_message, Data},
|
{send_message, Data},
|
||||||
AfterQuery,
|
|
||||||
#{producer := Producer, ordering_key := OrderingKey, payload := Payload}
|
#{producer := Producer, ordering_key := OrderingKey, payload := Payload}
|
||||||
) ->
|
) ->
|
||||||
Record = to_record(OrderingKey, Payload, Data),
|
Record = to_record(OrderingKey, Payload, Data),
|
||||||
do_append(AfterQuery, Producer, Record).
|
do_append(Producer, Record).
|
||||||
|
|
||||||
on_get_status(_InstId, #{client := Client}) ->
|
on_get_status(_InstId, #{client := Client}) ->
|
||||||
case is_alive(Client) of
|
case is_alive(Client) of
|
||||||
|
@ -260,27 +261,26 @@ to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
|
||||||
to_record(OrderingKey, Payload) ->
|
to_record(OrderingKey, Payload) ->
|
||||||
hstreamdb:to_record(OrderingKey, raw, Payload).
|
hstreamdb:to_record(OrderingKey, raw, Payload).
|
||||||
|
|
||||||
do_append(AfterQuery, Producer, Record) ->
|
do_append(Producer, Record) ->
|
||||||
do_append(AfterQuery, false, Producer, Record).
|
do_append(false, Producer, Record).
|
||||||
|
|
||||||
%% TODO: this append is async, remove or change it after we have better disk cache.
|
%% TODO: this append is async, remove or change it after we have better disk cache.
|
||||||
% do_append(AfterQuery, true, Producer, Record) ->
|
% do_append(true, Producer, Record) ->
|
||||||
% case hstreamdb:append(Producer, Record) of
|
% case hstreamdb:append(Producer, Record) of
|
||||||
% ok ->
|
% ok ->
|
||||||
% ?SLOG(debug, #{
|
% ?SLOG(debug, #{
|
||||||
% msg => "hstreamdb producer async append success",
|
% msg => "hstreamdb producer async append success",
|
||||||
% record => Record
|
% record => Record
|
||||||
% }),
|
% });
|
||||||
% emqx_resource:query_success(AfterQuery);
|
% {error, Reason} = Err ->
|
||||||
% {error, Reason} ->
|
|
||||||
% ?SLOG(error, #{
|
% ?SLOG(error, #{
|
||||||
% msg => "hstreamdb producer async append failed",
|
% msg => "hstreamdb producer async append failed",
|
||||||
% reason => Reason,
|
% reason => Reason,
|
||||||
% record => Record
|
% record => Record
|
||||||
% }),
|
% }),
|
||||||
% emqx_resource:query_failed(AfterQuery)
|
% Err
|
||||||
% end;
|
% end;
|
||||||
do_append(AfterQuery, false, Producer, Record) ->
|
do_append(false, Producer, Record) ->
|
||||||
%% TODO: this append is sync, but it does not support [Record], can only append one Record.
|
%% TODO: this append is sync, but it does not support [Record], can only append one Record.
|
||||||
%% Change it after we have better dick cache.
|
%% Change it after we have better dick cache.
|
||||||
case hstreamdb:append_flush(Producer, Record) of
|
case hstreamdb:append_flush(Producer, Record) of
|
||||||
|
@ -288,15 +288,14 @@ do_append(AfterQuery, false, Producer, Record) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
msg => "hstreamdb producer sync append success",
|
msg => "hstreamdb producer sync append success",
|
||||||
record => Record
|
record => Record
|
||||||
}),
|
});
|
||||||
emqx_resource:query_success(AfterQuery);
|
{error, Reason} = Err ->
|
||||||
{error, Reason} ->
|
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "hstreamdb producer sync append failed",
|
msg => "hstreamdb producer sync append failed",
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
record => Record
|
record => Record
|
||||||
}),
|
}),
|
||||||
emqx_resource:query_failed(AfterQuery)
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
client_name(InstId) ->
|
client_name(InstId) ->
|
||||||
|
|
|
@ -13,9 +13,10 @@
|
||||||
|
|
||||||
%% callbacks of behaviour emqx_resource
|
%% callbacks of behaviour emqx_resource
|
||||||
-export([
|
-export([
|
||||||
|
callback_mode/0,
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/4,
|
on_query/3,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% resource callback
|
%% resource callback
|
||||||
|
callback_mode() -> always_sync.
|
||||||
|
|
||||||
on_start(InstId, Config) ->
|
on_start(InstId, Config) ->
|
||||||
start_client(InstId, Config).
|
start_client(InstId, Config).
|
||||||
|
@ -35,8 +37,8 @@ on_start(InstId, Config) ->
|
||||||
on_stop(_InstId, #{client := Client}) ->
|
on_stop(_InstId, #{client := Client}) ->
|
||||||
influxdb:stop_client(Client).
|
influxdb:stop_client(Client).
|
||||||
|
|
||||||
on_query(InstId, {send_message, Data}, AfterQuery, State) ->
|
on_query(InstId, {send_message, Data}, State) ->
|
||||||
do_query(InstId, {send_message, Data}, AfterQuery, State).
|
do_query(InstId, {send_message, Data}, State).
|
||||||
|
|
||||||
on_get_status(_InstId, #{client := Client}) ->
|
on_get_status(_InstId, #{client := Client}) ->
|
||||||
case influxdb:is_alive(Client) of
|
case influxdb:is_alive(Client) of
|
||||||
|
@ -308,7 +310,7 @@ ssl_config(SSL = #{enable := true}) ->
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Query
|
%% Query
|
||||||
|
|
||||||
do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) ->
|
do_query(InstId, {send_message, Data}, State = #{client := Client}) ->
|
||||||
{Points, Errs} = data_to_points(Data, State),
|
{Points, Errs} = data_to_points(Data, State),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({error, Reason}) ->
|
fun({error, Reason}) ->
|
||||||
|
@ -326,15 +328,14 @@ do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client})
|
||||||
msg => "influxdb write point success",
|
msg => "influxdb write point success",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
points => Points
|
points => Points
|
||||||
}),
|
});
|
||||||
emqx_resource:query_success(AfterQuery);
|
{error, Reason} = Err ->
|
||||||
{error, Reason} ->
|
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "influxdb write point failed",
|
msg => "influxdb write point failed",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
emqx_resource:query_failed(AfterQuery)
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue