fix: create pulsar producer when on_add_channel

This commit is contained in:
zhongwencool 2024-02-27 13:32:30 +08:00
parent 7f1b4cef27
commit 650f9659a4
6 changed files with 183 additions and 169 deletions

View File

@ -33,7 +33,6 @@ roots() ->
fields(pulsar_producer) ->
fields(config) ++
emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++
fields(producer_opts) ++
[
{local_topic,
mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})},
@ -85,6 +84,7 @@ fields(config) ->
] ++ emqx_connector_schema_lib:ssl_fields();
fields(producer_opts) ->
[
{pulsar_topic, mk(string(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{batch_size,
mk(
pos_integer(),
@ -110,7 +110,6 @@ fields(producer_opts) ->
emqx_schema:bytesize(),
#{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")}
)},
{pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})},
{strategy,
mk(
hoconsc:enum([random, roundrobin, key_dispatch]),
@ -202,7 +201,37 @@ conn_bridge_examples(_Method) ->
#{
<<"pulsar_producer">> => #{
summary => <<"Pulsar Producer Bridge">>,
value => #{todo => true}
value => #{
<<"authentication">> => <<"none">>,
<<"batch_size">> => 1,
<<"buffer">> =>
#{
<<"memory_overload_protection">> => true,
<<"mode">> => <<"memory">>,
<<"per_partition_limit">> => <<"10MB">>,
<<"segment_bytes">> => <<"5MB">>
},
<<"compression">> => <<"no_compression">>,
<<"enable">> => true,
<<"local_topic">> => <<"mqtt/topic/-576460752303423482">>,
<<"max_batch_bytes">> => <<"900KB">>,
<<"message">> =>
#{<<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.}">>},
<<"name">> => <<"pulsar_example_name">>,
<<"pulsar_topic">> => <<"pulsar_example_topic">>,
<<"retention_period">> => <<"infinity">>,
<<"send_buffer">> => <<"1MB">>,
<<"servers">> => <<"pulsar://127.0.0.1:6650">>,
<<"ssl">> =>
#{
<<"enable">> => false,
<<"server_name_indication">> => <<"auto">>,
<<"verify">> => <<"verify_none">>
},
<<"strategy">> => <<"key_dispatch">>,
<<"sync_timeout">> => <<"5s">>,
<<"type">> => <<"pulsar_producer">>
}
}
}
].

View File

@ -11,8 +11,7 @@
action_type_name/0,
connector_type_name/0,
schema_module/0,
is_action/1,
action_convert_from_connector/2
is_action/1
]).
is_action(_) -> true.
@ -24,31 +23,3 @@ action_type_name() -> pulsar.
connector_type_name() -> pulsar.
schema_module() -> emqx_bridge_pulsar_pubsub_schema.
action_convert_from_connector(ConnectorConfig, ActionConfig) ->
Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)),
case Dispatch of
<<"key_dispatch">> ->
case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of
{ok, Message} ->
Validator =
#{
<<"strategy">> => key_dispatch,
<<"message">> => emqx_utils_maps:binary_key_map(Message)
},
case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of
ok ->
ActionConfig;
{error, Reason} ->
throw(#{
reason => Reason,
kind => validation_error
})
end;
{not_found, _, _} ->
%% no message field, use the default message template
ActionConfig
end;
_ ->
ActionConfig
end.

View File

@ -25,13 +25,11 @@
-type pulsar_client_id() :: atom().
-type state() :: #{
pulsar_client_id := pulsar_client_id(),
producers := pulsar_producers:producers(),
channels := map()
client_id := pulsar_client_id(),
channels := map(),
client_opts := map()
}.
-type buffer_mode() :: memory | disk | hybrid.
-type compression_mode() :: no_compression | snappy | zlib.
-type partition_strategy() :: random | roundrobin | key_dispatch.
-type message_template_raw() :: #{
key := binary(),
value := binary()
@ -42,25 +40,9 @@
}.
-type config() :: #{
authentication := _,
batch_size := pos_integer(),
bridge_name := atom(),
buffer := #{
mode := buffer_mode(),
per_partition_limit := emqx_schema:bytesize(),
segment_bytes := emqx_schema:bytesize(),
memory_overload_protection := boolean()
},
compression := compression_mode(),
connect_timeout := emqx_schema:duration_ms(),
max_batch_bytes := emqx_schema:bytesize(),
message := message_template_raw(),
pulsar_topic := binary(),
retention_period := infinity | emqx_schema:duration_ms(),
send_buffer := emqx_schema:bytesize(),
servers := binary(),
ssl := _,
strategy := partition_strategy(),
sync_timeout := emqx_schema:duration_ms()
ssl := _
}.
%% Allocatable resources
@ -116,54 +98,57 @@ on_start(InstanceId, Config) ->
end,
throw(Message)
end,
start_producer(Config, InstanceId, ClientId, ClientOpts).
{ok, #{channels => #{}, client_id => ClientId, client_opts => ClientOpts}}.
on_add_channel(
_InstanceId,
#{channels := Channels} = State,
InstanceId,
#{channels := Channels, client_id := ClientId, client_opts := ClientOpts} = State,
ChannelId,
#{parameters := #{message := Message, sync_timeout := SyncTimeout}}
#{parameters := #{message := Message, sync_timeout := SyncTimeout} = Params}
) ->
case maps:is_key(ChannelId, Channels) of
true ->
{error, already_exists};
{error, channel_already_exists};
false ->
{ok, Producers} = start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params),
Parameters = #{
message => compile_message_template(Message),
sync_timeout => SyncTimeout
sync_timeout => SyncTimeout,
producers => Producers
},
NewChannels = maps:put(ChannelId, Parameters, Channels),
{ok, State#{channels => NewChannels}}
end.
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
{ok, State#{channels => maps:remove(ChannelId, Channels)}}.
on_remove_channel(InstanceId, State, ChannelId) ->
#{channels := Channels, client_id := ClientId} = State,
case maps:find(ChannelId, Channels) of
{ok, #{producers := Producers}} ->
stop_producers(ClientId, Producers),
emqx_resource:deallocate_resource(InstanceId, {?pulsar_producers, ChannelId}),
{ok, State#{channels => maps:remove(ChannelId, Channels)}};
error ->
{ok, State}
end.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
-spec on_stop(resource_id(), state()) -> ok.
on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
#{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} ->
stop_producers(ClientId, Producers),
Resources0 = emqx_resource:get_allocated_resources(InstanceId),
case maps:take(?pulsar_client_id, Resources0) of
{ClientId, Resources} ->
maps:foreach(
fun({?pulsar_producers, _BridgeV2Id}, Producers) ->
stop_producers(ClientId, Producers)
end,
Resources
),
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => Producers
}),
ok;
#{?pulsar_client_id := ClientId} ->
stop_client(ClientId),
?tp(pulsar_bridge_stopped, #{
instance_id => InstanceId,
pulsar_client_id => ClientId,
pulsar_producers => undefined
}),
ok;
_ ->
?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
ok;
error ->
ok
end.
@ -172,35 +157,32 @@ on_stop(InstanceId, _State) ->
%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
-spec on_get_status(resource_id(), state()) -> connected | connecting.
on_get_status(_InstanceId, State = #{}) ->
#{
pulsar_client_id := ClientId,
producers := Producers
} = State,
#{client_id := ClientId} = State,
case pulsar_client_sup:find_client(ClientId) of
{ok, Pid} ->
try pulsar_client:get_status(Pid) of
true ->
get_producer_status(Producers);
false ->
connecting
true -> ?status_connected;
false -> ?status_connecting
catch
error:timeout ->
connecting;
?status_connecting;
exit:{noproc, _} ->
connecting
?status_connecting
end;
{error, _} ->
connecting
?status_connecting
end;
on_get_status(_InstanceId, _State) ->
%% If a health check happens just after a concurrent request to
%% create the bridge is not quite finished, `State = undefined'.
connecting.
?status_connecting.
on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
case maps:is_key(ChannelId, Channels) of
true -> connected;
false -> {error, channel_not_exists}
case maps:find(ChannelId, Channels) of
{ok, #{producers := Producers}} ->
get_producer_status(Producers);
error ->
{error, channel_not_exists}
end.
-spec on_query(resource_id(), tuple(), state()) ->
@ -208,11 +190,11 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
| {error, timeout}
| {error, term()}.
on_query(_InstanceId, {ChannelId, Message}, State) ->
#{producers := Producers, channels := Channels} = State,
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_exists};
{ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} ->
{ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} ->
PulsarMessage = render_message(Message, MessageTmpl),
try
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
@ -227,11 +209,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
) ->
{ok, pid()}.
on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
#{producers := Producers, channels := Channels} = State,
#{channels := Channels} = State,
case maps:find(ChannelId, Channels) of
error ->
{error, channel_not_exists};
{ok, #{message := MessageTmpl}} ->
{ok, #{message := MessageTmpl, producers := Producers}} ->
?tp_span(
pulsar_producer_on_query_async,
#{instance_id => _InstanceId, message => Message},
@ -299,18 +281,22 @@ conn_opts(#{authentication := #{jwt := JWT}}) ->
replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]).
-spec producer_name(pulsar_client_id()) -> atom().
producer_name(ClientId) ->
ClientIdBin = emqx_utils_conv:bin(ClientId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
ClientIdBin
])
).
producer_name(InstanceId, ChannelId) ->
case is_dry_run(InstanceId) of
%% do not create more atom
true ->
pulsar_producer_probe_worker;
false ->
ChannelIdBin = emqx_utils_conv:bin(ChannelId),
binary_to_atom(
iolist_to_binary([
<<"producer-">>,
ChannelIdBin
])
)
end.
-spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}.
start_producer(Config, InstanceId, ClientId, ClientOpts) ->
start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) ->
#{
conn_opts := ConnOpts,
ssl_opts := SSLOpts
@ -325,16 +311,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
},
compression := Compression,
max_batch_bytes := MaxBatchBytes,
pulsar_topic := PulsarTopic0,
pulsar_topic := PulsarTopic,
retention_period := RetentionPeriod,
send_buffer := SendBuffer,
strategy := Strategy
} = Config,
} = Params,
{OffloadMode, ReplayQDir} =
case BufferMode of
memory -> {false, false};
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
disk -> {false, replayq_dir(ChannelId)};
hybrid -> {true, replayq_dir(ChannelId)}
end,
MemOLP =
case os:type() of
@ -348,7 +334,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
replayq_seg_bytes => SegmentBytes,
drop_if_highmem => MemOLP
},
ProducerName = producer_name(ClientId),
ProducerName = producer_name(InstanceId, ChannelId),
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
ProducerOpts0 =
#{
@ -363,19 +349,17 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
tcp_opts => [{sndbuf, SendBuffer}]
},
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
PulsarTopic = binary_to_list(PulsarTopic0),
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} ->
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers),
ok = emqx_resource:allocate_resource(
InstanceId,
{?pulsar_producers, ChannelId},
Producers
),
?tp(pulsar_producer_producers_allocated, #{}),
State = #{
pulsar_client_id => ClientId,
producers => Producers,
channels => #{}
},
?tp(pulsar_producer_bridge_started, #{}),
{ok, State}
{ok, Producers}
catch
Kind:Error:Stacktrace ->
?tp(
@ -388,7 +372,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
stacktrace => Stacktrace
}
),
stop_client(ClientId),
?tp(pulsar_bridge_producer_stopped, #{
pulsar_client_id => ClientId,
producers => undefined
}),
throw(failed_to_start_pulsar_producer)
end.
@ -412,7 +399,10 @@ stop_producers(ClientId, Producers) ->
_ = log_when_error(
fun() ->
ok = pulsar:stop_and_delete_supervised_producers(Producers),
?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}),
?tp(pulsar_bridge_producer_stopped, #{
pulsar_client_id => ClientId,
producers => Producers
}),
ok
end,
#{
@ -467,15 +457,19 @@ get_producer_status(Producers) ->
do_get_producer_status(Producers, 0).
do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT ->
connecting;
?status_connecting;
do_get_producer_status(Producers, TimeSpent) ->
case pulsar_producers:all_connected(Producers) of
try pulsar_producers:all_connected(Producers) of
true ->
connected;
?status_connected;
false ->
Sleep = 200,
timer:sleep(Sleep),
do_get_producer_status(Producers, TimeSpent + Sleep)
%% producer crashed with badarg. will recover later
catch
error:badarg ->
?status_connecting
end.
partition_strategy(key_dispatch) -> first_key_dispatch;
@ -485,17 +479,17 @@ is_sensitive_key(auth_data) -> true;
is_sensitive_key(_) -> false.
get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
Iter = maps:iterator(BrokerErrorMap),
do_get_error_message(Iter);
Iterator = maps:iterator(BrokerErrorMap),
do_get_error_message(Iterator);
get_error_message(_Error) ->
error.
do_get_error_message(Iter) ->
case maps:next(Iter) of
{{_Broker, _Port}, #{message := Message}, _NIter} ->
do_get_error_message(Iterator) ->
case maps:next(Iterator) of
{{_Broker, _Port}, #{message := Message}, _NIterator} ->
{ok, Message};
{_K, _V, NIter} ->
do_get_error_message(NIter);
{_K, _V, NIterator} ->
do_get_error_message(NIterator);
none ->
error
end.

View File

@ -18,9 +18,8 @@ namespace() -> ?TYPE.
roots() -> [].
fields("config_connector") ->
lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++
emqx_bridge_pulsar:fields(config) ++
emqx_bridge_pulsar:fields(producer_opts) ++
emqx_bridge_schema:common_bridge_fields() ++
lists:keydelete(enable, 1, emqx_bridge_pulsar:fields(config)) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
@ -59,13 +58,5 @@ connector_example_values() ->
servers => <<"pulsar://127.0.0.1:6650">>,
authentication => none,
connect_timeout => <<"5s">>,
batch_size => 10,
compression => no_compression,
send_buffer => <<"1MB">>,
retention_period => <<"100s">>,
max_batch_bytes => <<"32MB">>,
pulsar_topic => <<"test_topic">>,
strategy => random,
buffer => #{mode => memory},
ssl => #{enable => false}
}.

View File

@ -32,22 +32,23 @@ fields(publisher_action) ->
?R_REF(action_parameters),
#{
required => true,
desc => ?DESC(action_parameters)
desc => ?DESC(action_parameters),
validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1
}
),
#{resource_opts_ref => ?R_REF(action_resource_opts)}
);
fields(action_parameters) ->
[
{sync_timeout,
?HOCON(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})},
{message,
?HOCON(?R_REF(producer_pulsar_message), #{
required => false, desc => ?DESC("producer_message_opts")
})},
{sync_timeout,
?HOCON(emqx_schema:timeout_duration_ms(), #{
default => <<"3s">>, desc => ?DESC("producer_sync_timeout")
})}
];
] ++ emqx_bridge_pulsar:fields(producer_opts);
fields(producer_pulsar_message) ->
[
{key,
@ -114,7 +115,8 @@ bridge_v2_examples(Method) ->
message => #{
key => <<"${.clientid}">>,
value => <<"${.}">>
}
},
pulsar_topic => <<"test_topic">>
}
}
)

View File

@ -599,13 +599,7 @@ t_start_and_produce_ok(Config) ->
_Sleep = 100,
_Attempts0 = 20,
begin
BridgeId = emqx_bridge_resource:bridge_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>,
Id = get_channel_id(Config),
?assertMatch(
#{
counters := #{
@ -634,6 +628,15 @@ t_start_and_produce_ok(Config) ->
),
ok.
get_channel_id(Config) ->
BridgeId = emqx_bridge_resource:bridge_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
ConnectorId = emqx_bridge_resource:resource_id(
<<"pulsar">>, ?config(pulsar_name, Config)
),
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
%% Under normal operations, the bridge will be called async via
%% `simple_async_query'.
t_sync_query(Config) ->
@ -900,7 +903,7 @@ t_failure_to_start_producer(Config) ->
{{ok, _}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_client_stopped},
#{?snk_kind := pulsar_bridge_producer_stopped},
20_000
),
ok
@ -928,6 +931,8 @@ t_producer_process_crash(Config) ->
#{?snk_kind := pulsar_producer_bridge_started},
10_000
),
ResourceId = resource_id(Config),
ChannelId = get_channel_id(Config),
[ProducerPid | _] = [
Pid
|| {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup),
@ -944,17 +949,23 @@ t_producer_process_crash(Config) ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
ResourceId = resource_id(Config),
?retry(
_Sleep0 = 50,
_Attempts0 = 50,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
?assertEqual(
#{error => <<"Not connected for unknown reason">>, status => connecting},
emqx_resource_manager:channel_health_check(ResourceId, ChannelId)
)
),
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual(
#{error => undefined, status => connected},
emqx_resource_manager:channel_health_check(ResourceId, ChannelId)
)
),
{_, {ok, _}} =
?wait_async_action(
@ -1002,8 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) ->
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when
Producers =/= undefined,
#{?snk_kind := pulsar_bridge_stopped, instance_id := InstanceId} when
InstanceId =/= undefined,
10_000
),
?assertEqual([], get_pulsar_producers()),
@ -1036,7 +1047,7 @@ t_resource_manager_crash_before_producers_started(Config) ->
{{error, {config_update_crashed, _}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
#{?snk_kind := pulsar_bridge_stopped},
10_000
),
?assertEqual([], get_pulsar_producers()),
@ -1236,3 +1247,19 @@ t_resilience(Config) ->
[]
),
ok.
get_producers_config(ConnectorId, ChannelId) ->
[
#{
state :=
#{
channels :=
#{ChannelId := #{producers := Producers}}
}
}
] =
lists:filter(
fun(#{id := Id}) -> Id =:= ConnectorId end,
emqx_resource_manager:list_all()
),
Producers.