fix(mqtt_bridge): fixes after rebasing onto current `master`
Rebased on top of 7f57ec47d5
This commit is contained in:
parent
6511693b2e
commit
e6ccfa5b39
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge_mqtt, [
|
||||
{description, "EMQX MQTT Broker Bridge"},
|
||||
{vsn, "0.1.7"},
|
||||
{vsn, "0.1.8"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -414,10 +414,8 @@ mk_client_opts(
|
|||
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
|
||||
}).
|
||||
|
||||
parse_id_to_name(<<?TEST_ID_PREFIX, Name/binary>>) ->
|
||||
Name;
|
||||
parse_id_to_name(Id) ->
|
||||
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}),
|
||||
{_Type, Name} = emqx_connector_resource:parse_connector_id(Id, #{atom_name => false}),
|
||||
Name.
|
||||
|
||||
mk_client_opt_password(Options = #{password := Secret}) ->
|
||||
|
@ -447,7 +445,6 @@ connect(Options) ->
|
|||
}),
|
||||
Name = proplists:get_value(name, Options),
|
||||
WorkerId = proplists:get_value(ecpool_worker_id, Options),
|
||||
WorkerId = proplists:get_value(ecpool_worker_id, Options),
|
||||
ClientOpts = proplists:get_value(client_opts, Options),
|
||||
case emqtt:start_link(mk_client_opts(Name, WorkerId, ClientOpts)) of
|
||||
{ok, Pid} ->
|
||||
|
@ -475,7 +472,7 @@ mk_client_opts(
|
|||
}.
|
||||
|
||||
mk_clientid(WorkerId, ClientId) ->
|
||||
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
|
||||
emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId).
|
||||
|
||||
mk_client_event_handler(Name, TopicToHandlerIndex) ->
|
||||
#{
|
||||
|
|
|
@ -38,6 +38,7 @@
|
|||
|
||||
-import(hoconsc, [mk/2, ref/2]).
|
||||
|
||||
-define(CONNECTOR_TYPE, mqtt).
|
||||
-define(MQTT_HOST_OPTS, #{default_port => 1883}).
|
||||
|
||||
namespace() -> "connector_mqtt".
|
||||
|
@ -66,28 +67,10 @@ fields("config") ->
|
|||
)}
|
||||
];
|
||||
fields("config_connector") ->
|
||||
[
|
||||
{enable,
|
||||
mk(
|
||||
boolean(),
|
||||
#{
|
||||
desc => <<"Enable or disable this connector">>,
|
||||
default => true
|
||||
}
|
||||
)},
|
||||
{description, emqx_schema:description_schema()},
|
||||
{pool_size, fun egress_pool_size/1}
|
||||
% {ingress,
|
||||
% mk(
|
||||
% hoconsc:array(
|
||||
% hoconsc:ref(connector_ingress)
|
||||
% ),
|
||||
% #{
|
||||
% required => {false, recursively},
|
||||
% desc => ?DESC("ingress_desc")
|
||||
% }
|
||||
% )}
|
||||
] ++ emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts) ++
|
||||
emqx_connector_schema:common_fields() ++ fields("specific_connector_config");
|
||||
fields("specific_connector_config") ->
|
||||
[{pool_size, fun egress_pool_size/1}] ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts) ++
|
||||
fields("server_configs");
|
||||
fields(resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
|
@ -317,12 +300,13 @@ fields("egress_remote") ->
|
|||
}
|
||||
)}
|
||||
];
|
||||
fields("get_connector") ->
|
||||
fields("config_connector");
|
||||
fields("post_connector") ->
|
||||
fields("config_connector");
|
||||
fields("put_connector") ->
|
||||
fields("config_connector");
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
Fields = fields("specific_connector_config"),
|
||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||
fields(What) ->
|
||||
error({emqx_bridge_mqtt_connector_schema, missing_field_handler, What}).
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ bridge_v1_config_to_action_config_helper(
|
|||
),
|
||||
LocalTopicMap = maps:get(<<"local">>, EgressMap0, #{}),
|
||||
LocalTopic = maps:get(<<"topic">>, LocalTopicMap, undefined),
|
||||
EgressMap1 = maps:remove(<<"local">>, EgressMap0),
|
||||
EgressMap1 = maps:without([<<"local">>, <<"pool_size">>], EgressMap0),
|
||||
%% Add parameters field (Egress map) to the action config
|
||||
ConfigMap2 = maps:put(<<"parameters">>, EgressMap1, ConfigMap1),
|
||||
ConfigMap3 =
|
||||
|
|
|
@ -564,17 +564,17 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|||
|
||||
t_egress_short_clientid(_Config) ->
|
||||
%% Name is short, expect the actual client ID in use is hashed from
|
||||
%% <name>E<nodename-hash>:<pool_worker_id>
|
||||
Name = "abc01234",
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
|
||||
%% <name><nodename-hash>:<pool_worker_id>
|
||||
Name = <<"abc01234">>,
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
|
||||
ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
|
||||
test_egress_clientid(Name, ExpectedClientId).
|
||||
|
||||
t_egress_long_clientid(_Config) ->
|
||||
%% Expect the actual client ID in use is hashed from
|
||||
%% <name>E<nodename-hash>:<pool_worker_id>
|
||||
Name = "abc01234567890123456789",
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
|
||||
%% <name><nodename-hash>:<pool_worker_id>
|
||||
Name = <<"abc012345678901234567890">>,
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
|
||||
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
|
||||
test_egress_clientid(Name, ExpectedClientId).
|
||||
|
||||
|
@ -1049,7 +1049,8 @@ create_bridge(Config = #{<<"type">> := Type, <<"name">> := Name}) ->
|
|||
<<"type">> := Type,
|
||||
<<"name">> := Name
|
||||
},
|
||||
emqx_utils_json:decode(Bridge)
|
||||
emqx_utils_json:decode(Bridge),
|
||||
#{expected_type => Type, expected_name => Name}
|
||||
),
|
||||
emqx_bridge_resource:bridge_id(Type, Name).
|
||||
|
||||
|
|
Loading…
Reference in New Issue