feat(emqx_bridge_mqtt): ${node} in topic config
Adds ${node} interpolation in topic option of configuration Just like the clientid already works Closes: #6431 feat(emqx_bridge_mqtt): bumped versions the right way chore(appup): appups for mqtt bridge feat(mqtt_bridge): test for bridge config And also bumped one small version chore(mqtt_bridge): updated CHANGES-4.3.md fix(mqtt_bridge): conditional export chore(mqtt_bridge): appup
This commit is contained in:
parent
cb5fe77706
commit
0bd080c063
|
@ -17,6 +17,8 @@ File format:
|
||||||
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
|
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
|
||||||
to force an immediate reload of all certificates after the files are updated on disk.
|
to force an immediate reload of all certificates after the files are updated on disk.
|
||||||
|
|
||||||
|
* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter)
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
|
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_mqtt,
|
{application, emqx_bridge_mqtt,
|
||||||
[{description, "EMQ X Bridge to MQTT Broker"},
|
[{description, "EMQ X Bridge to MQTT Broker"},
|
||||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
{vsn, "4.3.4"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,replayq,emqtt]},
|
{applications, [kernel,stdlib,replayq,emqtt]},
|
||||||
|
|
|
@ -1,24 +1,20 @@
|
||||||
%% -*-: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
{VSN,
|
||||||
{"4.3.3",
|
[{"4.3.3",[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
|
||||||
[
|
{<<"4\\.3\\.[1-2]">>,
|
||||||
{<<"4.3.[1-2]">>, [
|
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{"4.3.0",
|
||||||
{"4.3.0", [
|
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
|
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
]},
|
{<<".*">>,[]}],
|
||||||
{<<".*">>, []}
|
[{"4.3.3",[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
|
||||||
],
|
{<<"4\\.3\\.[1-2]">>,
|
||||||
[
|
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||||
{<<"4.3.[1-2]">>, [
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
{"4.3.0",
|
||||||
]},
|
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
|
||||||
{"4.3.0", [
|
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
|
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
|
{<<".*">>,[]}]}.
|
||||||
]},
|
|
||||||
{<<".*">>, []}
|
|
||||||
]
|
|
||||||
}.
|
|
||||||
|
|
|
@ -37,6 +37,11 @@
|
||||||
, handle_disconnected/2
|
, handle_disconnected/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% for testing
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([ replvar/1 ]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
@ -176,13 +181,13 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
||||||
end
|
end
|
||||||
end, Subscriptions).
|
end, Subscriptions).
|
||||||
|
|
||||||
|
replvar(Options) ->
|
||||||
|
replvar([topic, clientid, max_inflight], Options).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
replvar(Options) ->
|
|
||||||
replvar([clientid, max_inflight], Options).
|
|
||||||
|
|
||||||
replvar([], Options) ->
|
replvar([], Options) ->
|
||||||
Options;
|
Options;
|
||||||
replvar([Key|More], Options) ->
|
replvar([Key|More], Options) ->
|
||||||
|
@ -194,8 +199,8 @@ replvar([Key|More], Options) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% ${node} => node()
|
%% ${node} => node()
|
||||||
feedvar(clientid, ClientId, _) ->
|
feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid ->
|
||||||
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node())));
|
iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node())));
|
||||||
|
|
||||||
feedvar(max_inflight, 0, _) ->
|
feedvar(max_inflight, 0, _) ->
|
||||||
infinity;
|
infinity;
|
||||||
|
|
|
@ -44,4 +44,15 @@ send_and_ack_test() ->
|
||||||
ok = emqx_bridge_mqtt:stop(Conn)
|
ok = emqx_bridge_mqtt:stop(Conn)
|
||||||
after
|
after
|
||||||
meck:unload(emqtt)
|
meck:unload(emqtt)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
replvar_test() ->
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
Config = #{clientid => <<"Hey ${node}">>, topic => <<"topic ${node}">>, other => <<"other">>},
|
||||||
|
|
||||||
|
ReplacedConfig = emqx_bridge_mqtt:replvar(Config),
|
||||||
|
|
||||||
|
ExpectedConfig = #{clientid => iolist_to_binary("Hey " ++ Node),
|
||||||
|
topic => iolist_to_binary("topic " ++ Node),
|
||||||
|
other => <<"other">>},
|
||||||
|
?assertEqual(ExpectedConfig, ReplacedConfig).
|
||||||
|
|
Loading…
Reference in New Issue