diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 4e7b7d72e..16684466f 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -69,20 +69,29 @@ parse_connector_id(ConnectorId) -> end. list_raw() -> - lists:foldl(fun({Type, NameAndConf}, Connectors) -> - lists:foldl(fun({Name, RawConf}, Acc) -> - [RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc] - end, Connectors, maps:to_list(NameAndConf)) - end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))). + case get_raw_connector_conf() of + not_found -> []; + Config -> + lists:foldl(fun({Type, NameAndConf}, Connectors) -> + lists:foldl(fun({Name, RawConf}, Acc) -> + [RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc] + end, Connectors, maps:to_list(NameAndConf)) + end, [], maps:to_list(Config)) + end. lookup_raw(Id) when is_binary(Id) -> {Type, Name} = parse_connector_id(Id), lookup_raw(Type, Name). lookup_raw(Type, Name) -> - case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of + Path = [bin(P) || P <- [Type, Name]], + case get_raw_connector_conf() of not_found -> {error, not_found}; - Conf -> {ok, Conf#{<<"type">> => Type, <<"name">> => Name}} + Conf -> + case emqx_map_lib:deep_get(Path, Conf, not_found) of + not_found -> {error, not_found}; + Conf1 -> {ok, Conf1#{<<"type">> => Type, <<"name">> => Name}} + end end. create_dry_run(Type, Conf) -> @@ -102,6 +111,15 @@ delete(Id) when is_binary(Id) -> delete(Type, Name) -> emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}). +get_raw_connector_conf() -> + case emqx:get_raw_config(config_key_path(), not_found) of + not_found -> not_found; + RawConf -> + #{<<"connectors">> := Conf} = + emqx_config:fill_defaults(#{<<"connectors">> => RawConf}), + Conf + end. + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 7fb657ee6..0695eab2b 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -62,7 +62,10 @@ topic filters for 'remote_topic' of ingress connections. #{ default => "127.0.0.1:1883" , desc => "The host and port of the remote MQTT broker" })} - , {reconnect_interval, mk_duration("Reconnect interval", #{default => "15s"})} + , {reconnect_interval, mk_duration( + "Reconnect interval. Delay for the MQTT bridge to retry establishing the connection " + "in case of transportation failure.", + #{default => "15s"})} , {proto_ver, sc(hoconsc:enum([v3, v4, v5]), #{ default => v4 @@ -83,8 +86,11 @@ topic filters for 'remote_topic' of ingress connections. #{ default => true , desc => "The clean-start or the clean-session of the MQTT protocol" })} - , {keepalive, mk_duration("Keepalive", #{default => "300s"})} - , {retry_interval, mk_duration("Retry interval", #{default => "15s"})} + , {keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})} + , {retry_interval, mk_duration( + "Message retry interval. Delay for the MQTT bridge to retry sending the QoS1/QoS2 " + "messages in case of ACK not received.", + #{default => "15s"})} , {max_inflight, sc(integer(), #{ default => 32