fix: hstream db connector & bridge, TODO: SUITE

This commit is contained in:
DDDHuang 2022-07-26 15:28:56 +08:00
parent 98b36c4681
commit fa4bc921ac
4 changed files with 56 additions and 11 deletions

View File

@ -29,6 +29,7 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(), {ok, Sup} = emqx_bridge_sup:start_link(),
ok = start_ee_apps(),
ok = emqx_bridge:load(), ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(), ok = emqx_bridge:load_hook(),
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
@ -41,6 +42,16 @@ stop(_State) ->
ok = emqx_bridge:unload_hook(), ok = emqx_bridge:unload_hook(),
ok. ok.
-ifdef(EMQX_RELEASE_EDITION).
start_ee_apps() ->
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
ok.
-else.
start_ee_apps() ->
ok.
-endif.
%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the %% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the
%% underlying resources. %% underlying resources.
pre_config_update(_, {_Oper, _, _}, undefined) -> pre_config_update(_, {_Oper, _, _}, undefined) ->

View File

@ -35,6 +35,6 @@ fields(bridges) ->
{hstream, {hstream,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")),
#{desc => <<"hstream_webhook">>} #{desc => <<"emqx enterprise config">>}
)} )}
]. ].

View File

@ -36,10 +36,10 @@ values(get) ->
values(post) -> values(post) ->
#{ #{
type => hstream, type => hstream,
name => <<"hstream_bridge_demo">>, name => <<"demo">>,
url => <<"http://127.0.0.1:6570">>, url => <<"http://127.0.0.1:6570">>,
stream => <<"stream1">>, stream => <<"stream1">>,
ordering_key => <<"${topic}">>, ordering_key => <<"some_key">>,
pool_size => 8, pool_size => 8,
enable => true, enable => true,
direction => egress, direction => egress,

View File

@ -46,9 +46,14 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
stop_producer => StopProducerRes stop_producer => StopProducerRes
}). }).
on_query(_InstId, {OrderingKey, Payload, Record}, AfterQuery, #{producer := Producer}) -> on_query(
Record = hstreamdb:to_record(OrderingKey, raw, Payload), _InstId,
do_append(AfterQuery, false, Producer, Record). {send_message, Data},
AfterQuery,
#{producer := Producer, ordering_key := OrderingKey, payload := Payload}
) ->
Record = to_record(OrderingKey, Payload, Data),
do_append(AfterQuery, Producer, Record).
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, #{client := Client}) ->
case is_alive(Client) of case is_alive(Client) of
@ -88,8 +93,13 @@ start_client(InstId, Config) ->
do_start_client(InstId, Config) do_start_client(InstId, Config)
catch catch
E:R:S -> E:R:S ->
io:format("E:R:S ~p:~p ~n~p~n", [E, R, S]), ?SLOG(error, #{
error(E) msg => "start hstream connector error",
connector => InstId,
error => E,
reason => R,
stack => S
})
end. end.
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
@ -167,7 +177,18 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
msg => "hstream connector: producer started" msg => "hstream connector: producer started"
}), }),
EnableBatch = maps:get(enable_batch, Options, false), EnableBatch = maps:get(enable_batch, Options, false),
{ok, #{client => Client, producer => Producer, enable_batch => EnableBatch}}; PayloadBin = maps:get(payload, Options, <<"">>),
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin),
State = #{
client => Client,
producer => Producer,
enable_batch => EnableBatch,
ordering_key => OrderingKey,
payload => Payload
},
{ok, State};
{error, {already_started, Pid}} -> {error, {already_started, Pid}} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting hstream connector: producer, find old producer. restart producer", msg => "starting hstream connector: producer, find old producer. restart producer",
@ -184,6 +205,19 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
{error, Reason} {error, Reason}
end. end.
to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data),
to_record(OrderingKey, Payload).
to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
to_record(binary_to_list(OrderingKey), Payload);
to_record(OrderingKey, Payload) ->
hstreamdb:to_record(OrderingKey, raw, Payload).
do_append(AfterQuery, Producer, Record) ->
do_append(AfterQuery, false, Producer, Record).
%% TODO: this append is async, remove or change it after we have better disk cache. %% TODO: this append is async, remove or change it after we have better disk cache.
% do_append(AfterQuery, true, Producer, Record) -> % do_append(AfterQuery, true, Producer, Record) ->
% case hstreamdb:append(Producer, Record) of % case hstreamdb:append(Producer, Record) of
@ -221,10 +255,10 @@ do_append(AfterQuery, false, Producer, Record) ->
end. end.
client_name(InstId) -> client_name(InstId) ->
"backend_hstream_client:" ++ to_string(InstId). "client:" ++ to_string(InstId).
produce_name(ActionId) -> produce_name(ActionId) ->
list_to_atom("backend_hstream_producer:" ++ to_string(ActionId)). list_to_atom("producer:" ++ to_string(ActionId)).
to_string(List) when is_list(List) -> List; to_string(List) when is_list(List) -> List;
to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin);