1
0
Fork 0
mirror of https://github.com/benbusby/farside.git synced 2025-06-08 02:16:36 +00:00

refactor, remove unnecessary dependencies, speed up instance check query

This commit is contained in:
mithereal 2022-07-27 19:56:25 -07:00
parent 8fbe016cdb
commit c1ec9e9a6b
15 changed files with 475 additions and 305 deletions

View file

@ -56,8 +56,6 @@ Farside's routing is very minimal, with only the following routes:
- `/` - `/`
- The app home page, displaying all live instances for every service - The app home page, displaying all live instances for every service
- `/ping`
- A passthrough "ping" to redis to ensure both app and redis are working
- `/:service/*glob` - `/:service/*glob`
- The main endpoint for redirecting a user to a working instance of a - The main endpoint for redirecting a user to a working instance of a
particular service with the specified path particular service with the specified path
@ -108,12 +106,9 @@ that their mission to centralize the entire web behind their service ultimately
goes against what Farside is trying to solve. Use at your own discretion. goes against what Farside is trying to solve. Use at your own discretion.
## Development ## Development
- Install [redis](https://redis.io)
- Install [elixir](https://elixir-lang.org/install.html) - Install [elixir](https://elixir-lang.org/install.html)
- (on Debian systems) Install [erlang-dev](https://https://packages.debian.org/sid/erlang-dev) - (on Debian systems) Install [erlang-dev](https://https://packages.debian.org/sid/erlang-dev)
- Start redis: `redis-server`
- Install dependencies: `mix deps.get` - Install dependencies: `mix deps.get`
- Initialize redis contents: `mix run -e Farside.Instances.sync`
- Run Farside: `mix run --no-halt` - Run Farside: `mix run --no-halt`
- Uses localhost:4001 - Uses localhost:4001
@ -123,5 +118,4 @@ goes against what Farside is trying to solve. Use at your own discretion.
| -- | -- | | -- | -- |
| FARSIDE_TEST | If enabled, bypasses the instance availability check and adds all instances to the pool. | | FARSIDE_TEST | If enabled, bypasses the instance availability check and adds all instances to the pool. |
| FARSIDE_PORT | The port to run Farside on (default: `4001`) | | FARSIDE_PORT | The port to run Farside on (default: `4001`) |
| FARSIDE_REDIS_PORT | The Redis server port to use (default: `6379`, same as the default for Redis) |
| FARSIDE_SERVICES_JSON | The "services" JSON file to use for selecting instances (default: `services.json`) | | FARSIDE_SERVICES_JSON | The "services" JSON file to use for selecting instances (default: `services.json`) |

View file

@ -3,8 +3,6 @@ import Config
config :farside, config :farside,
update_file: ".update-results", update_file: ".update-results",
service_prefix: "service-", service_prefix: "service-",
fallback_suffix: "-fallback",
previous_suffix: "-previous",
index: "index.eex", index: "index.eex",
route: "route.eex", route: "route.eex",
headers: [ headers: [

View file

@ -1,7 +1,5 @@
defmodule Farside do defmodule Farside do
@service_prefix Application.fetch_env!(:farside, :service_prefix) @service_prefix Application.fetch_env!(:farside, :service_prefix)
@fallback_suffix Application.fetch_env!(:farside, :fallback_suffix)
@previous_suffix Application.fetch_env!(:farside, :previous_suffix)
# Define relation between available services and their parent service. # Define relation between available services and their parent service.
# This enables Farside to redirect with links such as: # This enables Farside to redirect with links such as:
@ -34,16 +32,12 @@ defmodule Farside do
@quora_regex => ["querte"] @quora_regex => ["querte"]
} }
def get_services_map do alias Farside.LastUpdated
{:ok, service_list} = Redix.command(:redix, ["KEYS", "#{@service_prefix}*"])
# Match service name to list of available instances def get_services_map do
Enum.reduce(service_list, %{}, fn service, acc -> Farside.Server.Supervisor.list()
{:ok, instance_list} = |> Enum.reduce(%{}, fn service, acc ->
Redix.command( {_, data} = :ets.lookup(String.to_atom(service), :data) |> List.first()
:redix,
["LRANGE", service, "0", "-1"]
)
Map.put( Map.put(
acc, acc,
@ -52,98 +46,32 @@ defmodule Farside do
@service_prefix, @service_prefix,
"" ""
), ),
instance_list data.instances
) )
end) end)
end end
def get_service(service) do def get_service(service \\ "libreddit/r/popular") do
# Check if service has an entry in Redis, otherwise try to service_name =
# match against available parent services Enum.find_value(
service_name = cond do @parent_services,
!check_service(service) -> fn {k, v} ->
Enum.find_value( String.match?(service, k) && Enum.random(v)
@parent_services,
fn {k, v} ->
String.match?(service, k) && Enum.random(v)
end)
true ->
service
end
service_name
end
def check_service(service) do
# Checks to see if a specific service has instances available
# in redis
{:ok, instances} =
Redix.command(
:redix,
[
"LRANGE",
"#{@service_prefix}#{service}",
"0",
"-1"
]
)
Enum.count(instances) > 0
end
def last_instance(service) do
# Fetches the last selected instance for a particular service
{:ok, previous} =
Redix.command(
:redix,
["GET", "#{service}#{@previous_suffix}"]
)
previous
end
def pick_instance(service) do
{:ok, instances} =
Redix.command(
:redix,
[
"LRANGE",
"#{@service_prefix}#{service}",
"0",
"-1"
]
)
# Either pick a random available instance,
# or fall back to the default one
instance =
if Enum.count(instances) > 0 do
if Enum.count(instances) == 1 do
# If there's only one instance, just return that one...
List.first(instances)
else
# ...otherwise pick a random one from the list, ensuring
# that the same instance is never picked twice in a row.
instance =
Enum.filter(instances, &(&1 != last_instance(service)))
|> Enum.random()
Redix.command(
:redix,
["SET", "#{service}#{@previous_suffix}", instance]
)
instance
end end
else )
{:ok, result} =
Redix.command(
:redix,
["GET", "#{service}#{@fallback_suffix}"]
)
result data = :ets.lookup(String.to_atom(service_name), :data)
end
instance {_, service} = List.first(data)
case Enum.count(service.instances) > 0 do
true -> Enum.random(service.instances)
false -> service.fallback
end
end
def get_last_updated do
LastUpdated.value()
end end
def amend_instance(instance, service, path) do def amend_instance(instance, service, path) do
@ -153,24 +81,15 @@ defmodule Farside do
# so a "/u" is appended if the requested path doesn't explicitly include # so a "/u" is appended if the requested path doesn't explicitly include
# "/p" for a post or an empty path for the home page. # "/p" for a post or an empty path for the home page.
if String.length(path) > 0 and if String.length(path) > 0 and
!String.starts_with?(path, "p/") and !String.starts_with?(path, "p/") and
!String.starts_with?(path, "u/") do !String.starts_with?(path, "u/") do
"#{instance}/u" "#{instance}/u"
else else
instance instance
end end
true -> true ->
instance instance
end end
end end
def get_last_updated do
{:ok, last_updated} =
Redix.command(
:redix,
["GET", "last_updated"]
)
last_updated
end
end end

View file

@ -1,32 +1,105 @@
defmodule Farside.Application do defmodule Farside.Application do
#@farside_port Application.fetch_env!(:farside, :port) # @farside_port Application.fetch_env!(:farside, :port)
#@redis_conn Application.fetch_env!(:farside, :redis_conn) # @redis_conn Application.fetch_env!(:farside, :redis_conn)
@moduledoc false @moduledoc false
use Application use Application
require Logger
alias Farside.LastUpdated
alias Farside.Sync
@impl true @impl true
def start(_type, _args) do def start(_type, _args) do
redis_conn = Application.fetch_env!(:farside, :redis_conn) port = Application.fetch_env!(:farside, :port)
farside_port = Application.fetch_env!(:farside, :port)
IO.puts "Runing on http://localhost:#{farside_port}"
IO.puts "Redis conn: #{redis_conn}"
children = [ Logger.info("Running on http://localhost:#{port}")
Plug.Cowboy.child_spec(
scheme: :http, maybe_loaded_children =
plug: Farside.Router, case is_nil(System.get_env("FARSIDE_TEST")) do
options: [ true ->
port: String.to_integer(farside_port) [{Sync, []}]
]
), false ->
{PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000}, Logger.info("Skipping sync job setup...")
{Redix, {redis_conn, [name: :redix]}}, []
Farside.Scheduler, end
Farside.Server
] children =
[
Plug.Cowboy.child_spec(
scheme: :http,
plug: Farside.Router,
options: [
port: String.to_integer(port)
]
),
{LastUpdated, DateTime.utc_now()},
{PlugAttack.Storage.Ets, name: Farside.Throttle.Storage, clean_period: 60_000},
{DynamicSupervisor, strategy: :one_for_one, name: :server_supervisor},
{Registry, keys: :unique, name: :servers}
] ++ maybe_loaded_children
opts = [strategy: :one_for_one, name: Farside.Supervisor] opts = [strategy: :one_for_one, name: Farside.Supervisor]
Supervisor.start_link(children, opts) Supervisor.start_link(children, opts)
|> load()
end
def load(response) do
services_json = Application.fetch_env!(:farside, :services_json)
queries = Application.fetch_env!(:farside, :queries)
{:ok, file} = File.read(services_json)
{:ok, json} = Jason.decode(file)
for service_json <- json do
service_atom =
for {key, val} <- service_json, into: %{} do
{String.to_existing_atom(key), val}
end
service = struct(%Service{}, service_atom)
request_urls =
Enum.map(service.instances, fn x ->
x <>
EEx.eval_string(
service.test_url,
query: Enum.random(queries)
)
end)
tasks =
for request_url <- request_urls do
Task.async(fn ->
reply = Farside.Http.request(request_url, service.type)
{request_url, reply}
end)
end
tasks_with_results = Task.yield_many(tasks, 5000)
instances =
Enum.map(tasks_with_results, fn {task, res} ->
# Shut down the tasks that did not reply nor exit
res || Task.shutdown(task, :brutal_kill)
end)
|> Enum.reject(fn x -> x == nil end)
|> Enum.map(fn {_, value} -> value end)
|> Enum.filter(fn {instance_url, value} ->
value == :good
end)
|> Enum.map(fn {url, _} -> url end)
service = %{service | instances: instances}
Farside.Instance.Supervisor.start(service)
end
LastUpdated.value(DateTime.utc_now())
response
end end
end end

47
lib/farside/http.ex Normal file
View file

@ -0,0 +1,47 @@
defmodule Farside.Http do
require Logger
@headers Application.fetch_env!(:farside, :headers)
def request(url) do
cond do
System.get_env("FARSIDE_TEST") ->
:good
true ->
HTTPoison.get(url, @headers)
|> then(&elem(&1, 1))
|> Map.get(:status_code)
|> case do
n when n < 400 ->
Logger.info("Response: [#{n}]")
:good
n ->
Logger.error("Response: [#{n}]")
:bad
end
end
end
def request(url, type) do
cond do
System.get_env("FARSIDE_TEST") ->
:good
true ->
HTTPoison.get(url, @headers)
|> then(&elem(&1, 1))
|> Map.get(:status_code)
|> case do
n when n < 400 ->
Logger.info("Type: #{type}, Response: [#{n}], Url: #{url}")
:good
n ->
Logger.error("Type: #{type}, Response: [#{n}], Url: #{url}")
:bad
end
end
end
end

124
lib/farside/instance.ex Normal file
View file

@ -0,0 +1,124 @@
defmodule Farside.Instance do
use GenServer
require Logger
@registry_name :servers
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker
}
end
def init(init_arg) do
ref =
:ets.new(String.to_atom(init_arg.type), [
:set,
:named_table,
:public,
read_concurrency: true,
write_concurrency: true
])
:ets.insert(ref, {:data, init_arg})
{:ok, %{type: init_arg.type, ref: ref}}
end
def start_link(arg) do
name = via_tuple(arg.type)
GenServer.start_link(__MODULE__, arg, name: name)
end
def shutdown() do
GenServer.call(__MODULE__, :shutdown)
end
def handle_call(
:shutdown,
_from,
state
) do
{:stop, {:ok, "Normal Shutdown"}, state}
end
def handle_cast(
:shutdown,
state
) do
{:stop, :normal, state}
end
def handle_cast(
:update,
state
) do
service = :ets.lookup(String.to_atom(state.type), :data)
{_, service} = List.first(service)
queries = Application.fetch_env!(:farside, :queries)
request_urls =
Enum.map(service.instances, fn x ->
x <>
EEx.eval_string(
service.test_url,
query: Enum.random(queries)
)
end)
tasks =
for request_url <- request_urls do
Task.async(fn ->
reply = Farside.Http.request(request_url, service.type)
{request_url, reply}
end)
end
tasks_with_results = Task.yield_many(tasks, 5000)
instances =
Enum.map(tasks_with_results, fn {task, res} ->
# Shut down the tasks that did not reply nor exit
res || Task.shutdown(task, :brutal_kill)
end)
|> Enum.reject(fn x -> x == nil end)
|> Enum.map(fn {_, value} -> value end)
|> Enum.filter(fn {instance_url, value} ->
value == :good
end)
|> Enum.map(fn {url, _} -> url end)
values = %{service | instances: instances}
:ets.delete_all_objects(String.to_atom(state.type))
:ets.insert(state.ref, {:data, values})
update_file = Application.fetch_env!(:farside, :update_file)
File.write(update_file, values.fallback)
{:noreply, state}
end
@doc false
def via_tuple(data, registry \\ @registry_name) do
{:via, Registry, {registry, data}}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
:ets.delete(names)
{:noreply, {names, refs}}
end
@impl true
def handle_info(_msg, state) do
{:noreply, state}
end
end

View file

@ -0,0 +1,93 @@
defmodule Farside.Instance.Supervisor do
use DynamicSupervisor
alias __MODULE__, as: SUPERVISOR
alias Farside.Instance, as: SERVER
@name :server_supervisor
@registry_name :servers
def child_spec() do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :supervisor
}
end
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: @name)
end
def start_link() do
DynamicSupervisor.start_link(__MODULE__, [], name: @name)
end
@impl true
def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start(opts \\ %{}) do
child_spec = {SERVER, opts}
DynamicSupervisor.start_child(@name, child_spec)
end
def stop(id) do
case Registry.lookup(@registry_name, id) do
[] ->
:ok
[{pid, _}] ->
Process.exit(pid, :shutdown)
:ok
end
end
def update_children() do
list()
|> Enum.each(fn x ->
SERVER.via_tuple(x)
|> GenServer.cast(:update)
end)
end
def whereis(id) do
case Registry.lookup(@registry_name, id) do
[{pid, _}] -> pid
[] -> nil
end
end
def find_or_create(id) do
if process_exists?(id) do
{:ok, id}
else
id |> start
end
end
def exists?(id) do
case Registry.lookup(@registry_name, id) do
[] -> false
_ -> true
end
end
def list do
DynamicSupervisor.which_children(@name)
|> Enum.map(fn {_, account_proc_pid, _, _} ->
Registry.keys(@registry_name, account_proc_pid)
|> List.first()
end)
|> Enum.sort()
end
def process_exists?(hash) do
case Registry.lookup(@registry_name, hash) do
[] -> false
_ -> true
end
end
end

View file

@ -1,115 +1,17 @@
defmodule Farside.Instances do defmodule Farside.Instances do
@fallback_suffix Application.fetch_env!(:farside, :fallback_suffix) alias Farside.LastUpdated
@update_file Application.fetch_env!(:farside, :update_file)
@service_prefix Application.fetch_env!(:farside, :service_prefix)
@headers Application.fetch_env!(:farside, :headers)
@queries Application.fetch_env!(:farside, :queries)
@debug_header "======== "
@debug_spacer " "
def sync() do def sync() do
File.rename(@update_file, "#{@update_file}-prev") update_file = Application.fetch_env!(:farside, :update_file)
update()
# Add UTC time of last update File.rm("#{update_file}-prev")
Redix.command(:redix, [
"SET",
"last_updated",
Calendar.strftime(DateTime.utc_now(), "%c")
])
end
def request(url) do File.rename(update_file, "#{update_file}-prev")
cond do
System.get_env("FARSIDE_TEST") ->
:good
true -> File.write(update_file, "")
HTTPoison.get(url, @headers)
|> then(&elem(&1, 1))
|> Map.get(:status_code)
|> case do
n when n < 400 ->
IO.puts("#{@debug_spacer}✓ [#{n}]")
:good
n -> LastUpdated.value(DateTime.utc_now())
IO.puts("#{@debug_spacer}x [#{(n && n) || "error"}]")
:bad
end
end
end
def update() do Farside.Instance.Supervisor.update_children()
services_json = Application.fetch_env!(:farside, :services_json)
{:ok, file} = File.read(services_json)
{:ok, json} = Jason.decode(file)
# Loop through all instances and check each for availability
for service_json <- json do
service_atom = for {key, val} <- service_json, into: %{} do
{String.to_existing_atom(key), val}
end
service = struct(%Service{}, service_atom)
IO.puts("#{@debug_header}#{service.type}")
result =
Enum.filter(service.instances, fn instance_url ->
request_url =
instance_url <>
EEx.eval_string(
service.test_url,
query: Enum.random(@queries)
)
IO.puts("#{@debug_spacer}#{request_url}")
request(request_url) == :good
end)
add_to_redis(service, result)
log_results(service.type, result)
end
end
def add_to_redis(service, instances) do
# Remove previous list of instances
Redix.command(:redix, [
"DEL",
"#{@service_prefix}#{service.type}"
])
# Update with new list of available instances
Redix.command(
:redix,
[
"LPUSH",
"#{@service_prefix}#{service.type}"
] ++ instances
)
# Set fallback to one of the available instances,
# or the default instance if all are "down"
if Enum.count(instances) > 0 do
Redix.command(:redix, [
"SET",
"#{service.type}#{@fallback_suffix}",
Enum.random(instances)
])
else
Redix.command(:redix, [
"SET",
"#{service.type}#{@fallback_suffix}",
service.fallback
])
end
end
def log_results(service_name, results) do
{:ok, file} = File.open(@update_file, [:append, {:delayed_write, 100, 20}])
IO.write(file, "#{service_name}: #{inspect(results)}\n")
File.close(file)
end end
end end

View file

@ -0,0 +1,15 @@
defmodule Farside.LastUpdated do
use Agent
def start_link(initial_value) do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end
def value do
Agent.get(__MODULE__, & &1)
end
def value(new_value) do
Agent.update(__MODULE__, fn _ -> new_value end)
end
end

View file

@ -29,12 +29,6 @@ defmodule Farside.Router do
send_resp(conn, 200, resp) send_resp(conn, 200, resp)
end end
get "/ping" do
# Useful for app healthcheck
{:ok, resp} = Redix.command(:redix, ["PING"])
send_resp(conn, 200, resp)
end
get "/_/:service/*glob" do get "/_/:service/*glob" do
r_path = String.slice(conn.request_path, 2..-1) r_path = String.slice(conn.request_path, 2..-1)
@ -48,37 +42,48 @@ defmodule Farside.Router do
end end
get "/:service/*glob" do get "/:service/*glob" do
service_name = cond do service_name =
service =~ "http" -> cond do
List.first(glob) service =~ "http" ->
true -> List.first(glob)
service
end
path = cond do true ->
service_name != service -> service
Enum.join(Enum.slice(glob, 1..-1), "/") end
true ->
Enum.join(glob, "/")
end
instance = cond do IO.inspect(service_name, label: "service_name")
conn.assigns[:throttle] != nil ->
Farside.get_service(service_name)
|> Farside.last_instance
|> Farside.amend_instance(service_name, path)
true ->
Farside.get_service(service_name)
|> Farside.pick_instance
|> Farside.amend_instance(service_name, path)
end
# Redirect to the available instance path =
conn cond do
|> Plug.Conn.resp(:found, "") service_name != service ->
|> Plug.Conn.put_resp_header( Enum.join(Enum.slice(glob, 1..-1), "/")
"location",
"#{instance}/#{path}#{get_query_params(conn)}" true ->
) Enum.join(glob, "/")
end
case service_name do
"favicon.ico" ->
conn |> Plug.Conn.resp(:not_found, "")
_ ->
instance =
cond do
conn.assigns[:throttle] != nil ->
Farside.get_service(service_name)
true ->
Farside.get_service(service_name)
end
|> Farside.amend_instance(service_name, path)
# Redirect to the available instance
conn
|> Plug.Conn.resp(:found, "")
|> Plug.Conn.put_resp_header(
"location",
"#{instance}/#{path}#{get_query_params(conn)}"
)
end
end end
end end

View file

@ -1,3 +0,0 @@
defmodule Farside.Scheduler do
use Quantum, otp_app: :farside
end

View file

@ -1,22 +0,0 @@
defmodule Farside.Server do
use GenServer
import Crontab.CronExpression
def init(init_arg) do
{:ok, init_arg}
end
def start_link(arg) do
if System.get_env("FARSIDE_TEST") do
IO.puts("Skipping sync job setup...")
else
Farside.Scheduler.new_job()
|> Quantum.Job.set_name(:sync)
|> Quantum.Job.set_schedule(~e[*/5 * * * *])
|> Quantum.Job.set_task(fn -> Farside.Instances.sync() end)
|> Farside.Scheduler.add_job()
end
GenServer.start_link(__MODULE__, arg)
end
end

28
lib/farside/sync.ex Normal file
View file

@ -0,0 +1,28 @@
defmodule Farside.Sync do
use Task
def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker
}
end
def start_link(_arg) do
Task.start_link(&poll/0)
end
def poll() do
receive do
after
300_000 ->
sync()
poll()
end
end
defp sync() do
Farside.Instances.sync()
end
end

View file

@ -25,9 +25,7 @@ defmodule Farside.MixProject do
{:httpoison, "~> 1.8"}, {:httpoison, "~> 1.8"},
{:jason, "~> 1.1"}, {:jason, "~> 1.1"},
{:plug_attack, "~> 0.4.2"}, {:plug_attack, "~> 0.4.2"},
{:plug_cowboy, "~> 2.0"}, {:plug_cowboy, "~> 2.0"}
{:quantum, "~> 3.0"},
{:redix, "~> 1.1"}
] ]
end end
end end

View file

@ -1,5 +1,4 @@
defmodule FarsideTest do defmodule FarsideTest do
use ExUnit.Case use ExUnit.Case
use Plug.Test use Plug.Test