Skip to content

Commit

Permalink
Refactor deployment agents.
Browse files Browse the repository at this point in the history
Signed-off-by: EdmondFrank <[email protected]>
  • Loading branch information
EdmondFrank committed Jul 2, 2024
1 parent be5b667 commit 8ae6eff
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 226 deletions.
131 changes: 18 additions & 113 deletions lib/compass_admin/agents/backend_agent.ex
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
defmodule CompassAdmin.Agents.BackendAgent do
use GenServer

import Ecto.Query
alias CompassAdmin.Repo
alias CompassAdmin.User
alias CompassAdmin.Agents.BaseAgent

@max_lines 5000
@agent_svn "backend_v1"

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_) do
{:ok,
restore() ||
%{
state: :ok,
logs: [
"Welcome to use OSS Compass Admin Deployments.\n",
"This page provides you with an overview of the last trigger and deploy information for your application. >_< \n"
],
last_trigger_id: nil,
last_triggered_at: nil,
last_triggered_result: nil,
last_deploy_id: nil,
last_deploy_at: nil,
last_deploy_result: nil
}}
{:ok, BaseAgent.init(@agent_svn)}
end

def execute(trigger_id) do
Expand All @@ -38,6 +22,13 @@ defmodule CompassAdmin.Agents.BackendAgent do
GenServer.call(__MODULE__, :get_state)
end

def update_deploy_state(deploy_state, last_deploy_id, last_deploy_result) do
GenServer.cast(
__MODULE__,
{:update_deploy_state, deploy_state, last_deploy_id, last_deploy_result}
)
end

def append_log(log) do
GenServer.cast(__MODULE__, {:append, log})
end
Expand All @@ -47,110 +38,24 @@ defmodule CompassAdmin.Agents.BackendAgent do
end

def handle_cast({:update_deploy_state, deploy_state, last_deploy_id, last_deploy_result}, state) do
last_triggered_result =
if state.last_triggered_result == {:ok, "processing"} do
last_deploy_result
else
state.last_triggered_result
end

new_state = %{
state
| state: deploy_state,
last_deploy_id: last_deploy_id,
last_deploy_at: Timex.now(),
last_deploy_result: last_deploy_result,
last_triggered_result: last_triggered_result
}

save(new_state)

{:noreply, new_state}
{:noreply,
BaseAgent.update_deploy_state(
state,
deploy_state,
last_deploy_id,
last_deploy_result
)}
end

def handle_cast({:append, log}, state) do
{:noreply, %{state | logs: [log | Enum.take(state.logs, @max_lines)]}}
{:noreply, BaseAgent.append_log(state, log)}
end

def handle_cast({:deploy, trigger_id}, state) do
user = Repo.one(from(u in User, where: u.id == ^trigger_id))

new_state = %{state | last_trigger_id: trigger_id, last_triggered_at: Timex.now()}

if user && user.role_level >= User.backend_dev_role() do
case state do
%{state: :ok} ->
Task.async(fn ->
GenServer.cast(
__MODULE__,
{:update_deploy_state, :processing, trigger_id, {:ok, "processing"}}
)

do_deployment(trigger_id)
end)

new_state = %{
new_state
| last_triggered_result: {:ok, "processing"},
state: :processing
}

save(new_state)
{:noreply, new_state}

_ ->
new_state = %{
new_state
| last_triggered_result: {:error, "no ready for new deployment"}
}

save(new_state)
{:noreply, new_state}
end
else
new_state = %{new_state | last_triggered_result: {:error, "no permission"}}
save(new_state)
{:noreply, new_state}
end
{:noreply, BaseAgent.deploy(__MODULE__, state, trigger_id, User.backend_dev_role())}
end

def handle_info(_msg, state) do
{:noreply, state}
end

defp do_deployment(trigger_id) do
[input: input, execute: execute] = Application.fetch_env!(:compass_admin, __MODULE__)

Exile.stream(["bash", "-l", "-c", execute], input: input, stderr: :consume)
|> Stream.each(fn stream ->
case stream do
{:stdout, data} -> log(data)
{:stderr, msg} -> log(msg)
{:exit, {:status, code}} ->
msg = "exit with status: #{code}"
log(msg)
if code != 0 do
GenServer.cast(__MODULE__, {:update_deploy_state, :ok, trigger_id, {:error, msg}})
else
GenServer.cast(__MODULE__, {:update_deploy_state, :ok, trigger_id, {:ok, :success}})
end
end
end)
|> Stream.run()
end

defp log(message) do
now = Timex.now() |> Timex.format!("%Y-%m-%d %H:%M:%S", :strftime)
append_log("[#{now}] #{message}")
end

defp save(state) do
Redix.command(:redix, ["SET", "compass:admin:#{@agent_svn}", :erlang.term_to_binary(state)])
end

defp restore() do
with {:ok, cached} <- Redix.command(:redix, ["GET", "compass:admin:#{@agent_svn}"]) do
if cached != nil, do: :erlang.binary_to_term(cached), else: nil
end || nil
end
end
125 changes: 125 additions & 0 deletions lib/compass_admin/agents/base_agent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
defmodule CompassAdmin.Agents.BaseAgent do
@max_lines 5000

import Ecto.Query
alias CompassAdmin.Repo
alias CompassAdmin.User

def init(cache_key) do
restore(cache_key)
|> Map.put(:cache_key, cache_key)
end

def deploy(module, state, trigger_id, role_required) do
user = Repo.one(from(u in User, where: u.id == ^trigger_id))
new_state = %{state | last_trigger_id: trigger_id, last_triggered_at: Timex.now()}

if user && user.role_level >= role_required do
case state do
%{state: :ok} ->
Task.async(fn ->
module.update_deploy_state(:processing, trigger_id, {:ok, "processing"})
do_deploy(trigger_id, module)
end)

new_state = %{
new_state
| last_triggered_result: {:ok, "processing"},
state: :processing
}

save(state[:cache_key], new_state)
new_state

_ ->
new_state = %{
new_state
| last_triggered_result: {:error, "no ready for new deployment"}
}

save(state[:cache_key], new_state)
new_state
end
else
new_state = %{new_state | last_triggered_result: {:error, "no permission"}}
save(state[:cache_key], new_state)
new_state
end
end

def do_deploy(trigger_id, module) do
[input: input, execute: execute] = Application.fetch_env!(:compass_admin, module)

Exile.stream(["bash", "-l", "-c", execute], input: input, stderr: :consume)
|> Stream.each(fn stream ->
case stream do
{:stdout, data} ->
module.append_log(data)

{:stderr, msg} ->
module.append_log(msg)

{:exit, {:status, code}} ->
msg = "exit with status: #{code}"
module.append_log(msg)

if code != 0 do
module.update_deploy_state(:ok, trigger_id, {:error, msg})
else
module.update_deploy_state(:ok, trigger_id, {:ok, :success})
end
end
end)
|> Stream.run()
end

def append_log(state, log) do
now = Timex.now() |> Timex.format!("%Y-%m-%d %H:%M:%S", :strftime)
%{state | logs: ["[#{now}] #{log}" | Enum.take(state.logs, @max_lines)]}
end

def update_deploy_state(state, deploy_state, last_deploy_id, last_deploy_result) do
last_triggered_result =
if state.last_triggered_result == {:ok, "processing"} do
last_deploy_result
else
state.last_triggered_result
end

new_state = %{
state
| state: deploy_state,
last_deploy_id: last_deploy_id,
last_deploy_at: Timex.now(),
last_deploy_result: last_deploy_result,
last_triggered_result: last_triggered_result
}

save(state[:cache_key], new_state)

new_state
end

defp restore(cache_key) do
with {:ok, cached} <- Redix.command(:redix, ["GET", "compass:admin:#{cache_key}"]) do
if cached != nil, do: :erlang.binary_to_term(cached), else: nil
end ||
%{
state: :ok,
logs: [
"Welcome to use OSS Compass Admin Deployments.\n",
"This page provides you with an overview of the last trigger and deploy information for your application. >_< \n"
],
last_trigger_id: nil,
last_triggered_at: nil,
last_triggered_result: nil,
last_deploy_id: nil,
last_deploy_at: nil,
last_deploy_result: nil
}
end

defp save(cache_key, state) do
Redix.command(:redix, ["SET", "compass:admin:#{cache_key}", :erlang.term_to_binary(state)])
end
end
Loading

0 comments on commit 8ae6eff

Please sign in to comment.