refactor(mqtt-bridge): unwrap single statem actions

So that the code would be easier to follow and harder to break.
Also drop a couple of unused macrodefs.
This commit is contained in:
Andrew Mayorov 2023-01-26 21:13:18 +03:00
parent c95f979413
commit 71f996b9d5
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 10 additions and 12 deletions

View File

@ -116,8 +116,6 @@
%% same as default in-flight limit for emqtt %% same as default in-flight limit for emqtt
-define(DEFAULT_INFLIGHT_SIZE, 32). -define(DEFAULT_INFLIGHT_SIZE, 32).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)).
%% @doc Start a bridge worker. Supported configs: %% @doc Start a bridge worker. Supported configs:
%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay %% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
@ -274,10 +272,10 @@ idle({call, From}, ensure_started, State) ->
{ok, State1} -> {ok, State1} ->
{next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]}; {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]};
{error, Reason, _State} -> {error, Reason, _State} ->
{keep_state_and_data, [{reply, From, {error, Reason}}]} {keep_state_and_data, {reply, From, {error, Reason}}}
end; end;
idle({call, From}, {send_to_remote, _}, _State) -> idle({call, From}, {send_to_remote, _}, _State) ->
{keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]}; {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
%% @doc Standing by for manual start. %% @doc Standing by for manual start.
idle(info, idle, #{start_type := manual}) -> idle(info, idle, #{start_type := manual}) ->
keep_state_and_data; keep_state_and_data;
@ -303,7 +301,7 @@ connected(state_timeout, connected, State) ->
connected({call, From}, {send_to_remote, Msg}, State) -> connected({call, From}, {send_to_remote, Msg}, State) ->
case do_send(State, Msg) of case do_send(State, Msg) of
{ok, NState} -> {ok, NState} ->
{keep_state, NState, [{reply, From, ok}]}; {keep_state, NState, {reply, From, ok}};
{error, Reason} -> {error, Reason} ->
{keep_state_and_data, {reply, From, {error, Reason}}} {keep_state_and_data, {reply, From, {error, Reason}}}
end; end;
@ -328,21 +326,21 @@ connected(Type, Content, State) ->
%% Common handlers %% Common handlers
common(StateName, {call, From}, status, _State) -> common(StateName, {call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, StateName}]}; {keep_state_and_data, {reply, From, StateName}};
common(_StateName, {call, From}, ping, #{connection := Conn} = _State) -> common(_StateName, {call, From}, ping, #{connection := Conn} = _State) ->
Reply = emqx_connector_mqtt_mod:ping(Conn), Reply = emqx_connector_mqtt_mod:ping(Conn),
{keep_state_and_data, [{reply, From, Reply}]}; {keep_state_and_data, {reply, From, Reply}};
common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) -> common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, {reply, From, ok}};
common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) -> common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) ->
Reply = emqx_connector_mqtt_mod:stop(Conn), Reply = emqx_connector_mqtt_mod:stop(Conn),
{next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]}; {next_state, idle, State#{connection => undefined}, {reply, From, Reply}};
common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) -> common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
{keep_state_and_data, [{reply, From, Forwards}]}; {keep_state_and_data, {reply, From, Forwards}};
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; {keep_state_and_data, {reply, From, maps:get(subscriptions, Connection, #{})}};
common(_StateName, {call, From}, Req, _State) -> common(_StateName, {call, From}, Req, _State) ->
{keep_state_and_data, [{reply, From, {error, {unsupported_request, Req}}}]}; {keep_state_and_data, {reply, From, {error, {unsupported_request, Req}}}};
common(_StateName, info, {'EXIT', _, _}, State) -> common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State}; {keep_state, State};
common(StateName, Type, Content, #{name := Name} = State) -> common(StateName, Type, Content, #{name := Name} = State) ->