diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 7f1e689ee..b07460643 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -256,15 +256,11 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of - {ok, _Group, #{query_mode := QM, status := connected}} -> + {ok, _Group, #{query_mode := QM}} -> case QM of sync -> emqx_resource_worker:sync_query(ResId, Request, Opts); async -> emqx_resource_worker:async_query(ResId, Request, Opts) end; - {ok, _Group, #{status := stopped}} -> - ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); - {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> - ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index e1b51ff12..9c632b662 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -526,7 +526,8 @@ name(Id, Index) -> list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])). disk_queue_dir(Id, Index) -> - filename:join([node(), emqx:data_dir(), Id, "queue:" ++ integer_to_list(Index)]). + QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), + filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). ensure_flush_timer(St = #{tref := undefined, batch_time := T}) -> Ref = make_ref(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 2c2bfad4f..d7fa65829 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -509,11 +509,9 @@ inc_action_metrics(ok, RuleId) -> inc_action_metrics({ok, _}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); inc_action_metrics({resource_down, _}, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'), - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'), - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(_, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index e78d77fdb..6a5d4a3a2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -55,6 +55,7 @@ values(post) -> sql => ?DEFAULT_SQL, local_topic => <<"local/topic/#">>, resource_opts => #{ + worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, enable_batch => false,