Add e621 export handling & e621 pool tag search

This commit is contained in:
Donovan Daniels 2024-06-04 14:48:30 -05:00
parent eca44717ba
commit 77733772b3
Signed by: Donovan_DMC
GPG Key ID: 907D29CBFD6157BA
30 changed files with 669 additions and 20 deletions

View File

@ -98,3 +98,5 @@ gem "retriable", "~> 3.1"
gem "addressable", "~> 2.8"
gem "aws-sdk-s3", "~> 1.149"
gem "opensearch-ruby", "~> 3.3"

View File

@ -132,6 +132,10 @@ GEM
ed25519 (1.3.0)
erubi (1.12.0)
exifr (1.4.0)
faraday (2.9.0)
faraday-net_http (>= 2.0, < 3.2)
faraday-net_http (3.1.0)
net-http
ffi (1.16.3)
filesize (0.2.0)
fspath (3.1.2)
@ -179,8 +183,11 @@ GEM
mini_mime (1.1.5)
minitest (5.20.0)
msgpack (1.7.2)
multi_json (1.15.0)
multi_xml (0.6.0)
mutex_m (0.2.0)
net-http (0.4.1)
uri
net-imap (0.4.4)
date
net-protocol
@ -193,6 +200,9 @@ GEM
nio4r (2.5.9)
nokogiri (1.15.4-x86_64-linux)
racc (~> 1.4)
opensearch-ruby (3.3.0)
faraday (>= 1.0, < 3)
multi_json (>= 1.0)
pagy (8.3.0)
parallel (1.23.0)
parser (3.2.2.4)
@ -309,6 +319,7 @@ GEM
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (2.5.0)
uri (0.13.0)
web-console (4.2.1)
actionview (>= 6.0.0)
activemodel (>= 6.0.0)
@ -342,6 +353,7 @@ DEPENDENCIES
image_optim (~> 0.31.3)
jbuilder
jsbundling-rails
opensearch-ruby (~> 3.3)
pagy (~> 8.3)
pg (~> 1.1)
puma (>= 5.0)
@ -367,4 +379,4 @@ RUBY VERSION
ruby 3.2.2p53
BUNDLED WITH
2.4.21
2.5.10

View File

@ -6,7 +6,7 @@ module E621Ws
E621WsRoutes::DOMAIN
end
def assset_domin
def assets_path
E621WsRoutes::DOMAIN
end

View File

@ -0,0 +1,27 @@
# frozen_string_literal: true
module E621Ws
class PoolsController < ApplicationController
include ::ApplicationController::CommonAssetRoutes
def index
limit = (params[:limit]&.to_i || 100).clamp(0, 320)
@results = E621::Pool.search(search_params, params[:page]&.to_i, limit)
respond_to do |fmt|
fmt.json do
render(json: @results)
end
end
end
def search_params
permit_search_params(%i[name_matches description_matches creator_id creator_name is_active category order tags])
end
protected
def site_domain
E621WsRoutes::POOLS_DOMAIN
end
end
end

View File

@ -22,5 +22,11 @@ module E621Ws
def history
render(json: E621Status.history)
end
protected
def site_domain
E621WsRoutes::STATUS_DOMAIN
end
end
end

View File

@ -0,0 +1,39 @@
# frozen_string_literal: true
class E621ExportDownloadJob < ApplicationJob
queue_as :default
def perform(type)
date = Time.zone.now.strftime("%Y-%m-%d")
url = "https://e621.net/db_export/#{type}-#{date}.csv.gz"
file = Tempfile.new(%W[e621-export-#{type}-#{date} .csv.gz])
file.binmode
res = HTTParty.get(url, httparty_options) do |chunk|
next if [301, 302].include?(chunk.code)
file.write(chunk)
end
unless res.success?
file.close
file.delete
raise("HTTP error code: #{res.code} #{res.message}")
end
file.rewind
gz = Zlib::GzipReader.new(file)
csv = Tempfile.new(%W[e621-export-#{date} .csv])
csv.binmode
csv.write(gz.read)
gz.close
file.close
file.delete
TableImportJob.perform_now("e621.#{type}", csv.path)
end
def httparty_options
{
timeout: 300,
stream_body: true,
}.deep_merge(Websites.config.httparty_options)
end
end

View File

@ -0,0 +1,10 @@
# frozen_string_literal: true
class E621PoolImportJob < ApplicationJob
queue_as :default
def perform
PoolIndex.create_index!(delete_existing: true)
PoolIndex.import!
end
end

View File

@ -0,0 +1,22 @@
# frozen_string_literal: true
class TableImportJob < ApplicationJob
queue_as :default
def perform(table, file)
ApplicationRecord.connection.execute("TRUNCATE #{table}")
ActiveRecord::Base.connection_db_config.configuration_hash => { host:, user:, database: }
password = ENV.fetch("WEBSITES_DATABASE_PASSWORD", nil)
system("psql -h #{host} -U #{user} -d #{database} #{password.blank? ? '' : "-p #{password} "}-c \"\\copy #{table} (#{COLUMN_MAP[table.to_sym]}) FROM '#{file}' DELIMITER ',' CSV HEADER;\"", exception: true)
FileUtils.rm(file)
end
COLUMN_MAP = {
"e621.pools": "id,name,created_at,updated_at,creator_id,description,is_active,category,post_ids",
"e621.posts": "id,uploader_id,created_at,md5,source,rating,image_width,image_height,tag_string,locked_tags,fav_count,file_ext,parent_id,change_seq,approver_id,file_size,comment_count,description,duration,updated_at,is_deleted,is_pending,is_flagged,score,up_score,down_score,is_rating_locked,is_status_locked,is_note_locked",
"e621.tag_aliases": "id,antecedent_name,consequent_name,created_at,status",
"e621.tag_implications": "id,antecedent_name,consequent_name,created_at,status",
"e621.tags": "id,name,category,post_count",
"e621.wiki_pages": "id,created_at,updated_at,title,body,creator_id,updater_id,is_locked",
}.freeze
end

View File

@ -0,0 +1,10 @@
# frozen_string_literal: true
class UpdateE621PoolTagsJob < ApplicationJob
queue_as :default
def perform(pool_id)
pool = E621::Pool.find(pool_id)
pool.update_column(:tag_string, pool.post_tags.join(" "))
end
end

View File

@ -1,4 +0,0 @@
# frozen_string_literal: true
module E621ExportsParser
end

View File

@ -0,0 +1,76 @@
# frozen_string_literal: true
class E621PoolQueryBuilder
attr_accessor :params, :must, :must_not, :should, :order
def initialize(params)
@params = params
@must = []
@must_not = []
@should = []
@order = []
end
def add_tag_string_search(query)
tag_list = query.split
aliases = E621::TagAlias.where(antecedent_name: tag_list.map { |t| t.sub(/\A([-~])/, "") })
tag_list.map do |tag|
type = :must
if tag =~ /\A([-~])/
tag = tag[1..]
type = $1 == "~" ? :should : :must_not
end
if (aliased = aliases.find { |a| a.antecedent_name == tag })
tag = aliased.consequent_name
end
case type
when :must
must << { term: { tags: tag } }
when :must_not
must_not << { term: { tags: tag } }
when :should
should << { term: { tags: tag } }
end
end
end
def add_other_search
must << { match_phrase_prefix: { name: params[:name_matches] } } if params[:name_matches].present?
must << { match_phrase_prefix: { description: params[:description_matches] } } if params[:description_matches].present?
must << { term: { creator_id: params[:creator_id] } } if params[:creator_id].present?
must << { term: { creator_id: Requests::E621.name_to_id(params[:creator_name]) } } if params[:creator_name].present?
must << { term: { is_active: params[:is_active] } } if params[:is_active].present?
must << { term: { category: params[:category] } } if params[:category].present?
must << { term: { id: params[:id] } } if params[:id].present?
end
def add_order
case params[:order]
when "name"
order << { name_kw: :asc }
when "created_at"
order << { created_at: :desc }
when "post_count"
order << { post_count: :desc }
when "id_asc"
order << { id: :asc }
else
order << { id: :desc }
end
end
def search_sql
q = E621::Pool.all
q = q.where("string_to_array(tag_string, ' ') @> ARRAY[?]", must) if must.any?
q = q.where("NOT(string_to_array(tag_string, ' ') && ARRAY[?])", must_not) if must_not.any?
q = q.where("string_to_array(tag_string, ' ') && ARRAY[?]", should) if should.any?
q
end
def search(page = 1, limit = 100)
add_tag_string_search(params[:tags]) if params[:tags].present?
add_other_search
add_order
PoolIndex.search(page, limit, must: must, must_not: must_not, should: should, order: order)
end
end

127
app/logical/pool_index.rb Normal file
View File

@ -0,0 +1,127 @@
module PoolIndex
module_function
def index_name
"pools_#{Rails.env}"
end
def index_config
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0,
max_result_window: 100_000,
},
},
mappings: {
dynamic: false,
properties: {
id: { type: "integer" },
tags: { type: "keyword" },
name: { type: "text" },
name_kw: { type: "keyword" },
created_at: { type: "date" },
updated_at: { type: "date" },
creator_id: { type: "integer" },
description: { type: "text" },
is_active: { type: "boolean" },
category: { type: "keyword" },
post_ids: { type: "integer" },
post_count: { type: "integer" },
},
},
}
end
def as_indexed_json(pool)
tags = pool.tag_string.split
{
id: pool.id,
tags: tags,
name: pool.name,
name_kw: pool.name,
created_at: pool.created_at,
updated_at: pool.updated_at,
creator_id: pool.creator_id,
description: pool.description,
is_active: pool.is_active,
category: pool.category,
post_ids: pool.post_ids,
post_count: pool.post_ids.size,
}
end
def import!
create_index!(delete_existing: true)
E621::Pool.find_in_batches(batch_size: 1000) do |batch|
batch.map! do |pool|
{
index: {
_id: pool.id,
data: as_indexed_json(pool),
},
}
end
client.bulk(body: batch, index: index_name)
end
end
def create_index!(delete_existing: false)
exists = index_exist?
delete_index! if exists && delete_existing
client.indices.create(index: index_name, body: index_config)
end
def delete_index!
client.indices.delete(index: index_name, ignore: 404)
end
def index_exist?
client.indices.exists(index: index_name)
end
def refresh_index!
client.indices.refresh(index: index_name)
end
def update_document(pool)
client.index(index: index_name, id: pool.id, body: as_indexed_json(pool))
end
def delete_document(id)
client.delete(index: index_name, id: id)
end
def client
@client ||= OpenSearch::Client.new(host: Websites.config.opensearch_host, request_timeout: 120)
end
def search(page, limit, must: [], must_not: [], should: [], order: nil)
page ||= 1
limit ||= 100
query = {
bool: {
must: must,
must_not: must_not,
should: should,
},
}
query[:bool][:minimum_should_match] = 1 if should.any?
query[:bool][:must].push({ match_all: {} }) if query[:bool][:must].empty?
from = (page - 1) * limit
body = {
from: from,
size: limit,
query: query,
sort: order,
_source: false,
timeout: "3000ms",
}
results = client.search(index: index_name, body: body)["hits"]["hits"].map { |hit| hit["_id"].to_i }
count = client.count(index: index_name, body: { query: query })["count"]
{ results: results, count: count }
end
end

View File

@ -95,32 +95,43 @@ module Requests
JSON.parse(r.body)&.first
end
def name_to_id(name)
Cache.fetch("name_to_id:#{name.downcase}", expires_in: 1.day) do
r = self.class.get("/users.json?search[name_matches]=#{name}", options)
JSON.parse(r.body)&.first&.dig("id")
end
end
def self.status
new.status
end
def self.get_post(**)
new.get_post(**)
def self.get_post(...)
new.get_post(...)
end
def self.get_posts(**)
new.get_posts(**)
def self.get_posts(...)
new.get_posts(...)
end
def self.get_all_posts(**)
new.get_all_posts(**)
def self.get_all_posts(...)
new.get_all_posts(...)
end
def self.get_posts_by_tags(**)
new.get_posts_by_tags(**)
def self.get_posts_by_tags(...)
new.get_posts_by_tags(...)
end
def self.get_all_posts_by_tags(**)
new.get_all_posts_by_tags(**)
def self.get_all_posts_by_tags(...)
new.get_all_posts_by_tags(...)
end
def self.find_replacement(**)
new.find_replacement(**)
def self.find_replacement(...)
new.find_replacement(...)
end
def self.name_to_id(...)
new.name_to_id(...)
end
end
end

57
app/models/e621/pool.rb Normal file
View File

@ -0,0 +1,57 @@
module E621
class Pool < ApplicationRecord
self.table_name = "e621.pools"
after_commit(on: %i[create update]) do
PoolIndex.update_document(self)
end
after_commit(on: :destroy) do
PoolIndex.delete_document(id)
end
def posts
E621::Post.where(id: post_ids)
end
def post_tags
posts.map(&:tags).flatten.uniq
end
def thumbnail_url
post = posts.first
return nil if post.nil? || post.is_deleted?
"https://static1.e621.net/data/#{post.md5[0..1]}/#{post.md5[2..3]}/#{post.md5}.#{post.file_ext}"
end
def self.search(params, page, limit)
E621PoolQueryBuilder.new(params).search(page, limit) => { results:, count: }
pools = where(id: results).sort_by { |pool| results.index(pool.id) }
{ pools: pools, total: count }
end
module SyncMethods
def sync
E621ExportDownloadJob.perform_later("pools")
end
def sync!
E621ExportDownloadJob.perform_now("pools")
end
def sync_tags
find_each do |pool|
UpdateE621PoolTagsJob.perform_later(pool.id)
end
end
def sync_tags!
find_each do |pool|
UpdateE621PoolTagsJob.perform_now(pool.id)
end
end
end
extend SyncMethods
end
end

21
app/models/e621/post.rb Normal file
View File

@ -0,0 +1,21 @@
module E621
class Post < ApplicationRecord
self.table_name = "e621.posts"
module SyncMethods
def sync
E621ExportDownloadJob.perform_later("posts")
end
def sync!
E621ExportDownloadJob.perform_now("posts")
end
end
extend SyncMethods
def tags
tag_string.split
end
end
end

17
app/models/e621/tag.rb Normal file
View File

@ -0,0 +1,17 @@
module E621
class Tag < ApplicationRecord
self.table_name = "e621.tags"
module SyncMethods
def sync
E621ExportDownloadJob.perform_later("tags")
end
def sync!
E621ExportDownloadJob.perform_now("tags")
end
end
extend SyncMethods
end
end

View File

@ -0,0 +1,17 @@
module E621
class TagAlias < ApplicationRecord
self.table_name = "e621.tag_aliases"
module SyncMethods
def sync
E621ExportDownloadJob.perform("tag_aliases")
end
def sync!
E621ExportDownloadJob.perform_now("tag_aliases")
end
end
extend SyncMethods
end
end

View File

@ -0,0 +1,17 @@
module E621
class TagImplication < ApplicationRecord
self.table_name = "e621.tag_implications"
module SyncMethods
def sync
E621ExportDownloadJob.perform_later("tag_implications")
end
def sync!
E621ExportDownloadJob.perform_now("tag_implications")
end
end
extend SyncMethods
end
end

View File

@ -0,0 +1,17 @@
module E621
class WikiPage < ApplicationRecord
self.table_name = "e621.wiki_pages"
module SyncMethods
def sync
E621ExportDownloadJob.perform_later("wiki_pages")
end
def sync!
E621ExportDownloadJob.perform_now("wiki_pages")
end
end
extend SyncMethods
end
end

View File

@ -48,6 +48,10 @@ module Websites
"redis://redis.websites4.containers.local/0"
end
def opensearch_host
"opensearch.websites4.containers.local"
end
def github_webhook_secret
end

View File

@ -83,5 +83,5 @@ Rails.application.configure do
config.hosts << /(.*\.)?#{host}/
end
config.web_console.whitelisted_ips = "172.0.0.0/8"
config.web_console.allowed_ips = "172.0.0.0/8"
end

View File

@ -0,0 +1,9 @@
def cache_store
[:memory_store, { size: 64.megabytes }]
end
Rails.application.configure do
config.cache_store = cache_store
config.action_controller.cache_store = cache_store
Rails.cache = ActiveSupport::Cache.lookup_store(Rails.application.config.cache_store)
end

View File

@ -4,6 +4,8 @@ module E621WsRoutes
DOMAIN = "e621.ws"
STATUS = "status"
STATUS_DOMAIN = "#{STATUS}.#{DOMAIN}".freeze
POOLS = "pools"
POOLS_DOMAIN = "#{POOLS}.#{DOMAIN}".freeze
def self.extended(router)
router.instance_exec do
@ -38,6 +40,15 @@ module E621WsRoutes
root(action: :index)
end
end
constraints(DomainConstraint.new(DOMAIN, POOLS)) do
namespace(:pools, path: "") do
get(:manifest, constraints: { format: "json" })
get(:browserconfig, constraints: { format: "xml" })
get(:json, action: :index, defaults: { format: :json })
root(action: :index)
end
end
end
end
end

View File

@ -28,3 +28,9 @@ set :output, "log/cron.log"
every 1.minute do
rake "e621:status_update"
end
# Update e621 exports at 12AM UTC
# E621 generates them somewhere around 7AM UTC (usually finishing around 7:45AM), but we're being safe
every :day, at: "10:50am" do
rake "e621:exports"
end

View File

@ -0,0 +1,74 @@
class CreateE621ExportTables < ActiveRecord::Migration[7.1]
def change
reversible do |r|
r.up { execute("CREATE SCHEMA e621") }
r.down { execute("DROP SCHEMA e621 CASCADE") }
end
create_table(:"e621.pools", id: :integer) do |t|
t.string(:name, null: false)
t.datetime(:created_at, null: false)
t.datetime(:updated_at, null: false)
t.integer(:creator_id, null: false)
t.string(:description)
t.boolean(:is_active, null: false, default: true)
t.string(:category, null: false, default: "series")
t.integer(:post_ids, array: [], null: false)
end
create_table(:"e621.posts", id: :integer) do |t|
t.integer(:uploader_id, null: false)
t.datetime(:created_at, null: false)
t.string(:md5, null: false)
t.string(:source, null: false)
t.string(:rating, null: false)
t.integer(:image_width, null: false)
t.integer(:image_height, null: false)
t.string(:tag_string, null: false)
t.string(:locked_tags)
t.integer(:fav_count, null: false)
t.string(:file_ext, null: false)
t.integer(:parent_id)
t.integer(:change_seq, null: false)
t.integer(:approver_id)
t.integer(:file_size, null: false)
t.integer(:comment_count, null: false)
t.string(:description, null: false)
t.numeric(:duration)
t.datetime(:updated_at)
t.boolean(:is_deleted, null: false, default: false)
t.boolean(:is_pending, null: false, default: false)
t.boolean(:is_flagged, null: false, default: false)
t.integer(:score, null: false)
t.integer(:up_score, null: false)
t.integer(:down_score, null: false)
t.boolean(:is_rating_locked, null: false, default: false)
t.boolean(:is_status_locked, null: false, default: false)
t.boolean(:is_note_locked, null: false, default: false)
end
create_table(:"e621.tag_aliases", id: :integer) do |t|
t.string(:antecedent_name, null: false)
t.string(:consequent_name, null: false)
t.datetime(:created_at)
t.string(:status, null: false)
end
create_table(:"e621.tag_implications", id: :integer) do |t|
t.string(:antecedent_name, null: false)
t.string(:consequent_name, null: false)
t.datetime(:created_at)
t.string(:status, null: false)
end
create_table(:"e621.tags", id: :integer) do |t| # rubocop:disable Rails/CreateTableWithTimestamps
t.string(:name, null: false)
t.string(:category, null: false)
t.integer(:post_count, null: false)
end
create_table(:"e621.wiki_pages", id: :integer) do |t|
t.datetime(:created_at, null: false)
t.datetime(:updated_at, null: false)
t.string(:title, null: false)
t.string(:body, null: false)
t.integer(:creator_id, null: false)
t.integer(:updater_id)
t.boolean(:is_locked, null: false, default: false)
end
end
end

View File

@ -0,0 +1,5 @@
class AddE621PoolsTagString < ActiveRecord::Migration[7.1]
def change
add_column(:"e621.pools", :tag_string, :string, null: false, default: "")
end
end

4
db/schema.rb generated
View File

@ -10,7 +10,9 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[7.1].define(version: 2024_05_04_225048) do
ActiveRecord::Schema[7.1].define(version: 2024_06_04_105520) do
create_schema "e621"
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

View File

@ -62,6 +62,28 @@ services:
networks:
- default
opensearch:
image: opensearchproject/opensearch:2.13.0
environment:
- discovery.type=single-node
- logger.level=WARN
- DISABLE_SECURITY_PLUGIN=true
- DISABLE_INSTALL_DEMO_CONFIG=true
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
volumes:
- ./data/opensearch:/usr/share/opensearch/data
init: true
healthcheck:
interval: 5s
timeout: 2s
retries: 12
test: curl "opensearch:9200/_cluster/health?wait_for_status=yellow&timeout=2s"
hostname: opensearch.websites4.containers.local
labels:
- "hostname=opensearch.websites4.containers.local"
networks:
- default
imgen:
image: ghcr.io/donovandmc/imgen
init: true

View File

@ -60,6 +60,28 @@ services:
networks:
- default
opensearch:
image: opensearchproject/opensearch:2.13.0
environment:
- discovery.type=single-node
- logger.level=WARN
- DISABLE_SECURITY_PLUGIN=true
- DISABLE_INSTALL_DEMO_CONFIG=true
- OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g
volumes:
- opensearch_data:/usr/share/opensearch/data
init: true
healthcheck:
interval: 5s
timeout: 2s
retries: 12
test: curl "opensearch:9200/_cluster/health?wait_for_status=yellow&timeout=2s"
hostname: opensearch.websites4.containers.local
labels:
- "hostname=opensearch.websites4.containers.local"
networks:
- default
imgen:
image: ghcr.io/donovandmc/imgen
init: true
@ -116,6 +138,7 @@ networks:
volumes:
db_data:
redis_data:
opensearch_data:
e621_thumbnail_data:
oceanic_docs_data:
rethinkdb_data:

View File

@ -12,4 +12,13 @@ namespace :e621 do
entry.destroy
end
end
task exports: :environment do
E621.constants.map { |c| E621.const_get(c) }.filter { |c| c < ApplicationRecord }.each do |type|
puts "Updating #{type}"
type.sync!
end
E621::Pool.sync_tags!
E621PoolImportJob.perform_now
end
end