refactor(bridge): rename message_in/out to ingress/egress_channels
This commit is contained in:
parent
c1ff8778e1
commit
7058b83760
|
@ -25,8 +25,8 @@ bridges.mqtt.my_mqtt_bridge_to_aws {
|
||||||
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
}
|
}
|
||||||
## we will create one MQTT connection for each element of the `message_in`
|
## we will create one MQTT connection for each element of the `ingress_channels`
|
||||||
message_in: [{
|
ingress_channels: [{
|
||||||
## the `id` will be used as part of the clientid
|
## the `id` will be used as part of the clientid
|
||||||
id = "pull_msgs_from_aws"
|
id = "pull_msgs_from_aws"
|
||||||
subscribe_remote_topic = "aws/#"
|
subscribe_remote_topic = "aws/#"
|
||||||
|
@ -36,8 +36,8 @@ bridges.mqtt.my_mqtt_bridge_to_aws {
|
||||||
qos = "${qos}"
|
qos = "${qos}"
|
||||||
retain = "${retain}"
|
retain = "${retain}"
|
||||||
}]
|
}]
|
||||||
## we will create one MQTT connection for each element of the `message_out`
|
## we will create one MQTT connection for each element of the `egress_channels`
|
||||||
message_out: [{
|
egress_channels: [{
|
||||||
## the `id` will be used as part of the clientid
|
## the `id` will be used as part of the clientid
|
||||||
id = "push_msgs_to_aws"
|
id = "push_msgs_to_aws"
|
||||||
subscribe_local_topic = "emqx/#"
|
subscribe_local_topic = "emqx/#"
|
||||||
|
|
|
@ -154,8 +154,8 @@ update_bridge(Type, Name, Conf) ->
|
||||||
%%
|
%%
|
||||||
%% - if the connection related configs like `username` is updated, we should restart/start
|
%% - if the connection related configs like `username` is updated, we should restart/start
|
||||||
%% or stop bridges according to the change.
|
%% or stop bridges according to the change.
|
||||||
%% - if the connection related configs are not update, but channel configs `message_in` or
|
%% - if the connection related configs are not update, but channel configs `ingress_channels` or
|
||||||
%% `message_out` are changed, then we should not restart the bridge, we only restart/start
|
%% `egress_channels` are changed, then we should not restart the bridge, we only restart/start
|
||||||
%% the channels.
|
%% the channels.
|
||||||
%%
|
%%
|
||||||
logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]),
|
logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]),
|
||||||
|
|
|
@ -89,8 +89,8 @@ on_start(InstId, Conf) ->
|
||||||
NamePrefix = binary_to_list(InstId),
|
NamePrefix = binary_to_list(InstId),
|
||||||
BasicConf = basic_config(Conf),
|
BasicConf = basic_config(Conf),
|
||||||
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
|
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
|
||||||
InOutConfigs = check_channel_id_dup(maps:get(message_in, Conf, [])
|
InOutConfigs = check_channel_id_dup(maps:get(ingress_channels, Conf, [])
|
||||||
++ maps:get(message_out, Conf, [])),
|
++ maps:get(egress_channels, Conf, [])),
|
||||||
lists:foldl(fun
|
lists:foldl(fun
|
||||||
(_InOutConf, {error, Reason}) ->
|
(_InOutConf, {error, Reason}) ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
|
@ -108,7 +108,7 @@ on_stop(InstId, #{sub_bridges := NameList}) ->
|
||||||
end, NameList).
|
end, NameList).
|
||||||
|
|
||||||
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
|
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
|
||||||
%% `message_in` and `message_out` config
|
%% `ingress_channels` and `egress_channels` config
|
||||||
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
|
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
|
||||||
baisc_conf := BasicConf}) ->
|
baisc_conf := BasicConf}) ->
|
||||||
create_channel(Conf, Prefix, BasicConf);
|
create_channel(Conf, Prefix, BasicConf);
|
||||||
|
@ -133,21 +133,21 @@ check_channel_id_dup(Confs) ->
|
||||||
end, Confs),
|
end, Confs),
|
||||||
Confs.
|
Confs.
|
||||||
|
|
||||||
%% this is an `message_in` bridge
|
%% this is an `ingress_channels` bridge
|
||||||
create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf,
|
create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf,
|
||||||
NamePrefix, BasicConf) ->
|
NamePrefix, BasicConf) ->
|
||||||
Name = bridge_name(NamePrefix, Id),
|
Name = bridge_name(NamePrefix, Id),
|
||||||
logger:info("creating 'message_in' channel ~p, remote ~s -> local ~s",
|
logger:info("creating ingress channel ~p, remote ~s -> local ~s",
|
||||||
[Name, RemoteT, LocalT]),
|
[Name, RemoteT, LocalT]),
|
||||||
create_sub_bridge(BasicConf#{
|
create_sub_bridge(BasicConf#{
|
||||||
name => Name,
|
name => Name,
|
||||||
clientid => clientid(Id),
|
clientid => clientid(Id),
|
||||||
subscriptions => InConf, forwards => undefined});
|
subscriptions => InConf, forwards => undefined});
|
||||||
%% this is an `message_out` bridge
|
%% this is an `egress_channels` bridge
|
||||||
create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf,
|
create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf,
|
||||||
NamePrefix, BasicConf) ->
|
NamePrefix, BasicConf) ->
|
||||||
Name = bridge_name(NamePrefix, Id),
|
Name = bridge_name(NamePrefix, Id),
|
||||||
logger:info("creating 'message_out' channel ~p, local ~s -> remote ~s",
|
logger:info("creating egress channel ~p, local ~s -> remote ~s",
|
||||||
[Name, LocalT, RemoteT]),
|
[Name, LocalT, RemoteT]),
|
||||||
create_sub_bridge(BasicConf#{
|
create_sub_bridge(BasicConf#{
|
||||||
name => bridge_name(NamePrefix, Id),
|
name => bridge_name(NamePrefix, Id),
|
||||||
|
|
|
@ -38,17 +38,17 @@ fields("config") ->
|
||||||
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
|
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
|
||||||
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
|
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
|
||||||
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
|
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
|
||||||
, {message_in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_in")), #{default => []})}
|
, {ingress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})}
|
||||||
, {message_out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_out")), #{default => []})}
|
, {egress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "egress_channels")), #{default => []})}
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||||
|
|
||||||
fields("message_in") ->
|
fields("ingress_channels") ->
|
||||||
[ {subscribe_remote_topic, #{type => binary(), nullable => false}}
|
[ {subscribe_remote_topic, #{type => binary(), nullable => false}}
|
||||||
, {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
|
, {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
|
||||||
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
|
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
|
||||||
] ++ common_inout_confs();
|
] ++ common_inout_confs();
|
||||||
|
|
||||||
fields("message_out") ->
|
fields("egress_channels") ->
|
||||||
[ {subscribe_local_topic, #{type => binary(), nullable => false}}
|
[ {subscribe_local_topic, #{type => binary(), nullable => false}}
|
||||||
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
|
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
|
||||||
] ++ common_inout_confs();
|
] ++ common_inout_confs();
|
||||||
|
|
Loading…
Reference in New Issue