Merge pull request #11174 from thalesmg/make-rule-server-a-binary
fix(mqtt_bridge): ensure `server` key is a binary
This commit is contained in:
commit
2cc0ba1217
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge_mqtt, [
|
{application, emqx_bridge_mqtt, [
|
||||||
{description, "EMQX MQTT Broker Bridge"},
|
{description, "EMQX MQTT Broker Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -221,7 +221,7 @@ import_msg(
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
id => emqx_guid:to_hexstr(emqx_guid:gen()),
|
id => emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
server => Server,
|
server => to_bin(Server),
|
||||||
payload => Payload,
|
payload => Payload,
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
qos => QoS,
|
qos => QoS,
|
||||||
|
@ -272,3 +272,6 @@ to_broker_msg(#{dup := Dup} = Msg, Local, Props) ->
|
||||||
emqx_message:make(bridge, QoS, Topic, Payload)
|
emqx_message:make(bridge, QoS, Topic, Payload)
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
to_bin(B) when is_binary(B) -> B;
|
||||||
|
to_bin(Str) when is_list(Str) -> iolist_to_binary(Str).
|
||||||
|
|
|
@ -248,6 +248,50 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_mqtt_conn_bridge_ingress_full_context(_Config) ->
|
||||||
|
User1 = <<"user1">>,
|
||||||
|
IngressConf =
|
||||||
|
emqx_utils_maps:deep_merge(
|
||||||
|
?INGRESS_CONF,
|
||||||
|
#{<<"local">> => #{<<"payload">> => <<"${.}">>}}
|
||||||
|
),
|
||||||
|
{ok, 201, _Bridge} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges"]),
|
||||||
|
?SERVER_CONF(User1)#{
|
||||||
|
<<"type">> => ?TYPE_MQTT,
|
||||||
|
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||||
|
<<"ingress">> => IngressConf
|
||||||
|
}
|
||||||
|
),
|
||||||
|
|
||||||
|
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
||||||
|
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
emqx:subscribe(LocalTopic),
|
||||||
|
timer:sleep(100),
|
||||||
|
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
||||||
|
%% the remote broker is also the local one.
|
||||||
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
|
%% we should receive a message on the local broker, with specified topic
|
||||||
|
#message{payload = EncodedPayload} = assert_mqtt_msg_received(LocalTopic),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"dup">> := false,
|
||||||
|
<<"id">> := _,
|
||||||
|
<<"message_received_at">> := _,
|
||||||
|
<<"payload">> := <<"hello">>,
|
||||||
|
<<"pub_props">> := #{},
|
||||||
|
<<"qos">> := 0,
|
||||||
|
<<"retain">> := false,
|
||||||
|
<<"server">> := <<"127.0.0.1:1883">>,
|
||||||
|
<<"topic">> := <<"ingress_remote_topic/1">>
|
||||||
|
},
|
||||||
|
emqx_utils_json:decode(EncodedPayload, [return_maps])
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
|
t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
|
||||||
PoolSize = 4,
|
PoolSize = 4,
|
||||||
Ns = lists:seq(1, 10),
|
Ns = lists:seq(1, 10),
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Fixed the encoding of the `server` key coming from an ingress MQTT bridge.
|
||||||
|
|
||||||
|
Before the fix, it was being encoded as a list of integers corresponding to the ASCII characters of the server string.
|
Loading…
Reference in New Issue