Merge pull request #8804 from terry-xiaoyu/fix_rule_send_to_disconnected_resources

fix: incorrect replayq dir for the emqx_resource
This commit is contained in:
Xinyu Liu 2022-08-25 18:06:02 +08:00 committed by GitHub
commit 0c9d12fd07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 6 additions and 10 deletions

View File

@ -256,15 +256,11 @@ query(ResId, Request) ->
Result :: term().
query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of
{ok, _Group, #{query_mode := QM, status := connected}} ->
{ok, _Group, #{query_mode := QM}} ->
case QM of
sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
async -> emqx_resource_worker:async_query(ResId, Request, Opts)
end;
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
?RESOURCE_ERROR(not_connected, "resource not connected");
{error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found")
end.

View File

@ -526,7 +526,8 @@ name(Id, Index) ->
list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])).
disk_queue_dir(Id, Index) ->
filename:join([node(), emqx:data_dir(), Id, "queue:" ++ integer_to_list(Index)]).
QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).
ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
Ref = make_ref(),

View File

@ -509,11 +509,9 @@ inc_action_metrics(ok, RuleId) ->
inc_action_metrics({ok, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({resource_down, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(_, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown').

View File

@ -55,6 +55,7 @@ values(post) ->
sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
enable_batch => false,