from __future__ import annotations
"""Database and filesystem operations for indexing workflows.
``DBOperations`` wraps SQLAlchemy session/engine handling and higher-level
update routines. ``AsyncFileOperations`` handles file discovery plus Tika/XMP
extraction for ``DBMeta`` and child ORM rows.
"""
import io
import os
from typing import List, Optional, Tuple, Generator, Sequence, Any
from datetime import datetime
import json
from pathlib import Path
from tika_client import AsyncTikaClient, TikaKey
from tika_client.data_models import TikaResponse
#import asyncio
#from itertools import batched
#from tqdm.auto import tqdm
from hashlib import file_digest
from PIL import Image
#import tika.parser as tp
import numpy as np
from libxmp import utils as xmpu
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker, Session
from .model import Base as ModelBase, DBMeta, DBPic, DBDoc, DBDinoV2Vector, DBVid, IdList,DBPool
from .config import Config
from . import appconf
def _dali_resizer(*args, **kwargs):
from .picture import DALIImageResizer
return DALIImageResizer(*args, **kwargs)
[Doku]
class DBOperations():
"""High-level PostgreSQL operations for one configured data pool."""
def __init__(
self,
pool_id: int | None = None,
pool: DBPool | None = None,
*,
allow_no_pool: bool = False,
#config: Config = appconf,
enginekwargs: dict | None = None,
sessionkwargs: dict | None = None,
):
self.config = appconf
enginekwargs = {} if not enginekwargs else enginekwargs
sessionkwargs = {} if not sessionkwargs else sessionkwargs
self.metadata = ModelBase.metadata
connect_args={"options": f"-c search_path={self.metadata.schema}"}
self.engine = sa.create_engine(self.config.db_conn_URL, connect_args = connect_args,
logging_name="DBOperations",pool_logging_name=self.config.logger_name, **enginekwargs)
self.logger = self.config.logger
try:
self.Session = sessionmaker(
bind=self.engine, expire_on_commit=False, **sessionkwargs
)
except Exception as e:
self.logger.error("Error creating PostgreSQL engine: %s", e)
raise
else:
self.logger.info("PostgreSQL engine created successfully.")
if pool_id is not None:
self.get_pool(pool_id)
elif pool is not None:
#self.create_pool(pool)
self.pool = pool
elif allow_no_pool:
self.logger.warning(
"No pool provided; filesystem operations require pool_id or pool"
)
self.pool = DBPool(
id=0, pool="default", rootpath="", description="Default pool"
)
else:
raise ValueError("Either pool_id or pool must be provided")
self.docroot = self.pool.rootpath
[Doku]
def schema_info(self) -> dict[str, Any]:
"""Get the schema information of the database."""
tables = {}
with self.Session() as session:
cat = sa.Table('pg_tables', sa.MetaData(schema='pg_catalog'), autoload_with=self.engine)
for tname in session.scalars(sa.select(cat.c.tablename).where(cat.c.schemaname == self.metadata.schema)):
table = sa.Table(tname, self.metadata,autoload_with=self.engine)
stmt = sa.select(sa.func.count()).select_from(table)
tables[tname] = session.scalars(stmt).one()
return {"schema":self.metadata.schema, "tables": tables}
[Doku]
def get_pool(self, pool_id: int)-> DBPool:
"""Get the pool with the given id."""
with self.Session() as session:
self.pool = session.get(DBPool, pool_id)
if self.pool is None:
raise ValueError(f"Pool with id {pool_id} not found")
return self.pool
[Doku]
def create_pool(self, pool: DBPool):
"""Insert a new data pool and make it active for this instance."""
with self.Session() as session:
session.add(pool)
session.commit()
session.refresh(pool)
self.pool = pool
self.docroot = pool.rootpath
self.logger.info("Pool created with id %s", pool.id)
return pool
[Doku]
def create_tables(self, entities:Optional[Sequence[type[ModelBase]]]=None):
"""Erstellt die Tabellen in der Datenbank, falls sie noch nicht existieren."""
if not entities:
tables = None
else:
tables = [entity.__tablename__ for entity in entities]
self.metadata.create_all(self.engine, tables=tables)
[Doku]
def recreate_tables(self, entities:Optional[Sequence[type[ModelBase]]]=None):
"""Recreate tables in the database."""
if not entities:
tables = None
else:
tables = [entity.__tablename__ for entity in entities]
self.metadata.drop_all(self.engine, tables=tables)
self.metadata.create_all(self.engine, tables=tables)
[Doku]
def clear_pool(self):
"""Löscht alle Daten aus dem angegebenen Pool."""
with self.Session() as session:
session.query(DBMeta).filter(DBMeta.pool_id == self.pool.id).delete()
session.flush()
session.commit()
[Doku]
def update_thumbs_no_heic(self)->List[int]:
"""Update the thumbnails and perceptual hashes for the pictures in the pool that are not HEICs."""
with self.Session() as session:
stmt = (
sa.select(DBPic.id, DBMeta.path)
.join(DBPic.meta)
.where(DBMeta.pool_id == self.pool.id)
.where(DBPic.thumbarray.is_(None))
.where(DBMeta.suffix.not_in([".HEIC", ".heic"]))
)
piclist = [
(os.path.join(self.docroot, row.path), row.id)
for row in session.execute(stmt)
]
resizer = _dali_resizer(pipe_batch_size=10, num_threads=4)
grespics, greslabels, gerrfiles, gerrlabels = resizer.resize_pics(piclist, batch_size=100, use_PIL=False)
thumblist = list(zip(grespics, greslabels))
self.update_thumbs(thumblist)
return gerrlabels
[Doku]
def update_thumbs_no_thumb(self)-> List[int]:
"""Update the thumbnails and perceptual hashes for the pictures in the pool that have no thumbnail."""
with self.Session() as session:
stmt = (
sa.select(DBPic.id, DBMeta.path)
.join(DBPic.meta)
.where(DBMeta.pool_id == self.pool.id)
.where(DBPic.thumbarray.is_(None))
)
piclist = [
(os.path.join(self.docroot, row.path), row.id)
for row in session.execute(stmt)
]
resizer = _dali_resizer(pipe_batch_size=1, num_threads=4,)
grespics, greslabels, gerrfiles, gerrlabels = resizer.resize_pics(piclist, batch_size=1, use_PIL=True)
thumblist = list(zip(grespics, greslabels))
self.update_thumbs(thumblist)
return gerrlabels
[Doku]
def update_thumbs(self, thumblist: List[Tuple[np.ndarray, int]]):
"""Update the thumbnails and perceptual hashes for the given list of pictures."""
with self.Session() as session:
for thumb, id in thumblist:
pic = session.get(DBPic, id)
if pic is None:
self.logger.warning("No DBPic row for id %s", id)
continue
pic.thumbarray = thumb
pic.set_phash()
session.flush()
session.commit()
[Doku]
class AsyncFileOperations(AsyncTikaClient):
"""Async file parsing helpers built on top of ``tika_client``."""
def __init__(self, pool:DBPool, config: Config = appconf,
skip_ocr: bool = True, timeout: float = 30, compress=True):
self.config = config
self.logger = config.logger
self.docroot = pool.rootpath
self.tika_url = config.tika_noocr_url if skip_ocr else config.tika_ocrurl
self.pool = pool
super().__init__(self.tika_url, timeout=timeout, compress=compress)
async def __aenter__(self) -> AsyncFileOperations:
"""Enter the TikaClient context."""
return self
[Doku]
def dir_tree(self, rel_path: str) -> Generator[str]:
"""Durchläuft rekursiv alle Dateien im Verzeichnisbaum,
extrahiert die Metadaten und Inhalte und erstellt DB-Objekte."""
abspath = os.path.join(self.docroot,rel_path)
assert os.path.exists(abspath), f"Directory {abspath} does not exist"
for dirpath, _, filenames in os.walk(abspath):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
yield os.path.relpath(filepath, self.docroot)
[Doku]
async def tika_parse(self, rel_path: str) -> Tuple[DBMeta|None, str]:
"""Parse one file with Tika and convert metadata into ``DBMeta`` fields."""
filepath = os.path.join(self.docroot, rel_path)
try:
parsed: list[TikaResponse] = await self.rmeta.as_text.from_file(Path(
filepath))
meta: dict = parsed[0].data
meta = json.loads(json.dumps(parsed[0].data).replace("\\u0000", "null"))
content = meta.pop(TikaKey.Content, "") # type: ignore
except Exception as e:
self.logger.error("Error %s parsing file %s", e, filepath)
meta = {}
content = ""
meta["file"] = filepath
# meta["fdate"] = datetime.fromtimestamp(
# os.path.getmtime(filepath)
# )
fsize = os.path.getsize(filepath)
path = rel_path
fdate: datetime = datetime.fromtimestamp(os.path.getmtime(filepath))
doc_cdate_str: str|None|list[str] = meta.get("dcterms:created", None)
if isinstance(doc_cdate_str, list):
doc_cdate_str = doc_cdate_str[0]
if doc_cdate_str is None:
doc_cdate = fdate
else:
try:
doc_cdate: datetime = datetime.fromisoformat(doc_cdate_str) # type: ignore
except Exception as e:
self.logger.error("Error %s parsing document creation date %s", e, doc_cdate_str)
doc_cdate = fdate
content_length = meta.get("Content-Length", 0)
if isinstance(content_length, list):
content_length: int = content_length[0]
suffix = os.path.splitext(path)[1]
ftype = self.config.get_ftype(suffix)
with open(filepath, "rb") as f:
sha256 = file_digest(f, "sha256")
try:
dbmeta = DBMeta(
#id = None, # type: ignore
pool_id= self.pool.id,
path=path,
fname=os.path.basename(path),
fdate=fdate,
sort_date= doc_cdate,
fsize=fsize,
clength=content_length,
ctype=meta.get("Content-Type", "unk/unk"),
md_keys=list(meta.keys()),
suffix=suffix,
ftype=ftype,
sha256=sha256.digest(),
meta_data=meta,
)
except Exception as e:
self.logger.error("Error %s creating DBMeta fpath %s", e, rel_path)
return None, content
else:
return dbmeta, content
[Doku]
async def file_to_db(self, rel_path: str) -> DBMeta|None:
"""Create ``DBMeta`` and optional typed child rows for a file path."""
dbmeta, content = await self.tika_parse(rel_path)
if dbmeta is None:
return None
dbmeta.inhalt = content
if dbmeta.ftype == "doc":
dbmeta.doc = DBDoc()
if dbmeta.ftype == "pic":
dbmeta.pic = self.DBPic_from_DBMeta(dbmeta)
if dbmeta.ftype == "vid":
dbmeta.vid = DBVid()
return dbmeta