fix: incorrect replayq dir for the emqx_resource

This commit is contained in:
Shawn 2022-08-25 16:06:18 +08:00
parent ff60019ffe
commit a896aa8b27
4 changed files with 6 additions and 10 deletions

View File

@ -256,15 +256,11 @@ query(ResId, Request) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of case emqx_resource_manager:ets_lookup(ResId) of
{ok, _Group, #{query_mode := QM, status := connected}} -> {ok, _Group, #{query_mode := QM}} ->
case QM of case QM of
sync -> emqx_resource_worker:sync_query(ResId, Request, Opts); sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
async -> emqx_resource_worker:async_query(ResId, Request, Opts) async -> emqx_resource_worker:async_query(ResId, Request, Opts)
end; end;
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
?RESOURCE_ERROR(not_connected, "resource not connected");
{error, not_found} -> {error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")
end. end.

View File

@ -526,7 +526,8 @@ name(Id, Index) ->
list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])). list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])).
disk_queue_dir(Id, Index) -> disk_queue_dir(Id, Index) ->
filename:join([node(), emqx:data_dir(), Id, "queue:" ++ integer_to_list(Index)]). QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
ensure_flush_timer(St = #{tref := undefined, batch_time := T}) -> ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(), Ref = make_ref(),

View File

@ -509,11 +509,9 @@ inc_action_metrics(ok, RuleId) ->
inc_action_metrics({ok, _}, RuleId) -> inc_action_metrics({ok, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({resource_down, _}, RuleId) -> inc_action_metrics({resource_down, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
inc_action_metrics(_, RuleId) -> inc_action_metrics(_, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'). emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown').

View File

@ -55,6 +55,7 @@ values(post) ->
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
enable_batch => false, enable_batch => false,