diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 36f8cf0f5..848266b43 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -237,6 +238,13 @@ mqtt_main_example() -> keepalive => <<"300s">>, retry_interval => <<"15s">>, max_inflight => 100, + resource_opts => #{ + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + query_mode => sync, + enable_queue => false, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + }, ssl => #{ enable => false } diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl index 973fc8192..8fb6af65c 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl @@ -12,16 +12,29 @@ namespace() -> "bridge_mqtt". roots() -> []. + fields("config") -> + %% enable emqx_bridge_schema:common_bridge_fields() ++ + [ + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ emqx_connector_mqtt_schema:fields("config"); +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; fields("post") -> - [ - type_field(), - name_field() - ] ++ emqx_connector_mqtt_schema:fields("config"); + [type_field(), name_field() | fields("config")]; fields("put") -> - emqx_connector_mqtt_schema:fields("config"); + fields("config"); fields("get") -> emqx_bridge_schema:metrics_status_fields() ++ fields("config"). @@ -31,22 +44,12 @@ desc(_) -> undefined. %%====================================================================================== +%% internal +is_hidden_opts(Field) -> + lists:member(Field, [enable_batch, batch_size, batch_time]). + type_field() -> - {type, - mk( - mqtt, - #{ - required => true, - desc => ?DESC("desc_type") - } - )}. + {type, mk(mqtt, #{required => true, desc => ?DESC("desc_type")})}. name_field() -> - {name, - mk( - binary(), - #{ - required => true, - desc => ?DESC("desc_name") - } - )}. + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 59eb3f0fb..0b9f7c85c 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -141,7 +141,7 @@ on_message_received(Msg, HookPoint, ResId) -> emqx:run_hook(HookPoint, [Msg]). %% =================================================================== -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start(InstId, Conf) -> InstanceId = binary_to_atom(InstId, utf8), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7571c59b8..f1ecbf68c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -138,7 +138,7 @@ send(#{client_pid := ClientPid}, Msg) -> emqtt:publish(ClientPid, Msg). send_async(#{client_pid := ClientPid}, Msg, Callback) -> - emqtt:publish_async(ClientPid, Msg, Callback). + emqtt:publish_async(ClientPid, Msg, infinity, Callback). handle_publish(Msg, undefined, _Opts) -> ?SLOG(error, #{ diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5e4fa8f72..618361ad3 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -306,8 +306,8 @@ connected({call, From}, {send_to_remote, Msg}, State) -> {keep_state_and_data, [[reply, From, {error, Reason}]]} end; connected(cast, {send_to_remote_async, Msg, Callback}, State) -> - {_, NewState} = do_send_async(State, Msg, Callback), - {keep_state, NewState}; + _ = do_send_async(State, Msg, Callback), + {keep_state, State}; connected( info, {disconnected, Conn, Reason},