fix(rocketmq action): make sure that topic template is respected
This commit is contained in:
parent
2fe36776b5
commit
1fe92bddd0
|
@ -139,14 +139,13 @@ on_add_channel(
|
||||||
ChannelId,
|
ChannelId,
|
||||||
ChannelConfig
|
ChannelConfig
|
||||||
) ->
|
) ->
|
||||||
{ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo),
|
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
|
||||||
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
|
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
|
||||||
%% Update state
|
%% Update state
|
||||||
NewState = OldState#{installed_channels => NewInstalledChannels},
|
NewState = OldState#{installed_channels => NewInstalledChannels},
|
||||||
{ok, NewState}.
|
{ok, NewState}.
|
||||||
|
|
||||||
create_channel_state(
|
create_channel_state(
|
||||||
ChannelId,
|
|
||||||
#{parameters := Conf} = _ChannelConfig,
|
#{parameters := Conf} = _ChannelConfig,
|
||||||
ACLInfo
|
ACLInfo
|
||||||
) ->
|
) ->
|
||||||
|
@ -155,7 +154,7 @@ create_channel_state(
|
||||||
sync_timeout := SyncTimeout
|
sync_timeout := SyncTimeout
|
||||||
} = Conf,
|
} = Conf,
|
||||||
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
|
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
|
||||||
ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo),
|
ProducerOpts = make_producer_opts(Conf, ACLInfo),
|
||||||
Templates = parse_template(Conf),
|
Templates = parse_template(Conf),
|
||||||
State = #{
|
State = #{
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
|
@ -203,7 +202,7 @@ on_stop(InstanceId, _State) ->
|
||||||
({_, client_id, ClientId}) ->
|
({_, client_id, ClientId}) ->
|
||||||
destory_producers_map(ClientId),
|
destory_producers_map(ClientId),
|
||||||
ok = rocketmq:stop_and_delete_supervised_client(ClientId);
|
ok = rocketmq:stop_and_delete_supervised_client(ClientId);
|
||||||
({_, _Topic, Producer}) ->
|
({_, _ChannelId, Producer}) ->
|
||||||
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
|
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
|
||||||
end,
|
end,
|
||||||
emqx_resource:get_allocated_resources_list(InstanceId)
|
emqx_resource:get_allocated_resources_list(InstanceId)
|
||||||
|
@ -259,7 +258,7 @@ do_query(
|
||||||
Data = apply_template(Query, Templates),
|
Data = apply_template(Query, Templates),
|
||||||
|
|
||||||
Result = safe_do_produce(
|
Result = safe_do_produce(
|
||||||
InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
|
ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
|
||||||
),
|
),
|
||||||
case Result of
|
case Result of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -285,9 +284,11 @@ do_query(
|
||||||
get_channel_id({ChannelId, _}) -> ChannelId;
|
get_channel_id({ChannelId, _}) -> ChannelId;
|
||||||
get_channel_id([{ChannelId, _} | _]) -> ChannelId.
|
get_channel_id([{ChannelId, _} | _]) -> ChannelId.
|
||||||
|
|
||||||
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
|
safe_do_produce(
|
||||||
|
ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
|
||||||
|
) ->
|
||||||
try
|
try
|
||||||
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
|
Producers = get_producers(ChannelId, InstanceId, ClientId, TopicKey, ProducerOpts),
|
||||||
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
|
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
|
||||||
catch
|
catch
|
||||||
_Type:Reason ->
|
_Type:Reason ->
|
||||||
|
@ -350,7 +351,6 @@ is_sensitive_key(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
make_producer_opts(
|
make_producer_opts(
|
||||||
ChannelId,
|
|
||||||
#{
|
#{
|
||||||
send_buffer := SendBuff,
|
send_buffer := SendBuff,
|
||||||
refresh_interval := RefreshInterval
|
refresh_interval := RefreshInterval
|
||||||
|
@ -358,9 +358,6 @@ make_producer_opts(
|
||||||
ACLInfo
|
ACLInfo
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
%% TODO: the name needs to be an atom but this may cause atom leak so we
|
|
||||||
%% should figure out a way to avoid this
|
|
||||||
name => binary_to_atom(ChannelId),
|
|
||||||
tcp_opts => [{sndbuf, SendBuff}],
|
tcp_opts => [{sndbuf, SendBuff}],
|
||||||
ref_topic_route_interval => RefreshInterval,
|
ref_topic_route_interval => RefreshInterval,
|
||||||
acl_info => emqx_secret:wrap(ACLInfo)
|
acl_info => emqx_secret:wrap(ACLInfo)
|
||||||
|
@ -396,16 +393,19 @@ destory_producers_map(ClientId) ->
|
||||||
ets:delete(Tid)
|
ets:delete(Tid)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_producers(InstanceId, ClientId, Topic, ProducerOpts) ->
|
get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) ->
|
||||||
case ets:lookup(ClientId, Topic) of
|
case ets:lookup(ClientId, ChannelId) of
|
||||||
[{_, Producers}] ->
|
[{_, Producers}] ->
|
||||||
Producers;
|
Producers;
|
||||||
_ ->
|
_ ->
|
||||||
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]),
|
ProducerGroup = ChannelId,
|
||||||
|
%% TODO: the name needs to be an atom but this may cause atom leak so we
|
||||||
|
%% should figure out a way to avoid this
|
||||||
|
ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)},
|
||||||
{ok, Producers} = rocketmq:ensure_supervised_producers(
|
{ok, Producers} = rocketmq:ensure_supervised_producers(
|
||||||
ClientId, ProducerGroup, Topic, ProducerOpts
|
ClientId, ProducerGroup, Topic, ProducerOpts2
|
||||||
),
|
),
|
||||||
ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers),
|
ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers),
|
||||||
ets:insert(ClientId, {Topic, Producers}),
|
ets:insert(ClientId, {ChannelId, Producers}),
|
||||||
Producers
|
Producers
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue