diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f67cc3e24..362ad47a3 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -124,6 +124,8 @@ The following application environment variables are available: - `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage. +- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`. + Runtime settings for the durable storages can be modified via CLI as well as the REST API. The following CLI commands are available: diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9330e0b1a..f3b5fdedf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -561,12 +561,27 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). --define(SAFERPC(EXPR), +-define(SAFE_ERPC(EXPR), try EXPR catch - error:RPCError = {erpc, _} -> - {error, recoverable, RPCError} + error:RPCError__ = {erpc, _} -> + {error, recoverable, RPCError__} + end +). + +-define(SHARD_RPC(DB, SHARD, NODE, BODY), + case + emqx_ds_replication_layer_shard:servers( + DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred) + ) + of + [{_, NODE} | _] -> + begin + BODY + end; + [] -> + {error, recoverable, replica_offline} end ). @@ -623,44 +638,79 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimestampUs = timestamp_to_timeus(Time), - ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)) + ). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)) + ). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimeUs = timestamp_to_timeus(StartTime), - ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)) + ). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimeUs = timestamp_to_timeus(StartTime), - ?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC( + emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs) + ) + ). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)) + ). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of - RPCError = {badrpc, _} -> - {error, recoverable, RPCError}; - Other -> - Other - end. + ?SHARD_RPC( + DB, + Shard, + Node, + case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of + Err = {badrpc, _} -> + {error, recoverable, Err}; + Ret -> + Ret + end + ). ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize)) + ). ra_list_generations_with_lifetimes(DB, Shard) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of + Reply = ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) + ), + case Reply of Gens = #{} -> maps:map( fun(_GenId, Data = #{since := Since, until := Until}) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 0bfa89e95..518e1e630 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -28,8 +28,7 @@ %% Dynamic server location API -export([ - servers/3, - server/3 + servers/3 ]). %% Membership @@ -83,16 +82,15 @@ server_name(DB, Shard, Site) -> %% --spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server(), ...] when - Order :: leader_preferred | undefined. -servers(DB, Shard, _Order = leader_preferred) -> +-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server()] when + Order :: leader_preferred | local_preferred | undefined. +servers(DB, Shard, leader_preferred) -> get_servers_leader_preferred(DB, Shard); +servers(DB, Shard, local_preferred) -> + get_servers_local_preferred(DB, Shard); servers(DB, Shard, _Order = undefined) -> get_shard_servers(DB, Shard). -server(DB, Shard, _Which = local_preferred) -> - get_server_local_preferred(DB, Shard). - get_servers_leader_preferred(DB, Shard) -> %% NOTE: Contact last known leader first, then rest of shard servers. ClusterName = get_cluster_name(DB, Shard), @@ -104,17 +102,24 @@ get_servers_leader_preferred(DB, Shard) -> get_online_servers(DB, Shard) end. -get_server_local_preferred(DB, Shard) -> - %% NOTE: Contact either local server or a random replica. +get_servers_local_preferred(DB, Shard) -> + %% Return list of servers, where the local replica (if exists) is + %% the first element. Note: result is _NOT_ shuffled. This can be + %% bad for the load balancing, but it makes results more + %% deterministic. Caller that doesn't care about that can shuffle + %% the results by itself. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of - Servers when is_list(Servers) -> - pick_local(Servers); undefined -> - %% TODO - %% Leader is unkonwn if there are no servers of this group on the - %% local node. We want to pick a replica in that case as well. - pick_random(get_online_servers(DB, Shard)) + Servers = get_online_servers(DB, Shard); + Servers when is_list(Servers) -> + ok + end, + case lists:keyfind(node(), 2, Servers) of + false -> + Servers; + Local when is_tuple(Local) -> + [Local | lists:delete(Local, Servers)] end. lookup_leader(DB, Shard) -> @@ -139,17 +144,6 @@ filter_online(Servers) -> is_server_online({_Name, Node}) -> Node == node() orelse lists:member(Node, nodes()). -pick_local(Servers) -> - case lists:keyfind(node(), 2, Servers) of - Local when is_tuple(Local) -> - Local; - false -> - pick_random(Servers) - end. - -pick_random(Servers) -> - lists:nth(rand:uniform(length(Servers)), Servers). - get_cluster_name(DB, Shard) -> memoize(fun cluster_name/2, [DB, Shard]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 8303ff861..eacf7e301 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -479,11 +479,13 @@ t_rebalance_offline_restarts(Config) -> %% shard_server_info(Node, DB, Shard, Site, Info) -> - Server = shard_server(Node, DB, Shard, Site), - {Server, ds_repl_shard(Node, server_info, [Info, Server])}. - -shard_server(Node, DB, Shard, Site) -> - ds_repl_shard(Node, shard_server, [DB, Shard, Site]). + ?ON( + Node, + begin + Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), + {Server, emqx_ds_replication_layer_shard:server_info(Info, Server)} + end + ). ds_repl_meta(Node, Fun) -> ds_repl_meta(Node, Fun, []). @@ -499,9 +501,6 @@ ds_repl_meta(Node, Fun, Args) -> error(meta_op_failed) end. -ds_repl_shard(Node, Fun, Args) -> - erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). - shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).