-
Notifications
You must be signed in to change notification settings - Fork 426
Expand file tree
/
Copy pathdocument_processor.py
More file actions
58 lines (46 loc) · 2.05 KB
/
document_processor.py
File metadata and controls
58 lines (46 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
from beeai_framework.backend.types import DocumentWithScore
from beeai_framework.backend.utils import load_module, parse_module
from beeai_framework.utils.strings import validate_class_name
__all__ = ["DocumentProcessor"]
class DocumentProcessor(ABC):
@classmethod
@abstractmethod
def _class_from_name(cls, class_name: str, **kwargs: Any) -> DocumentProcessor:
raise NotImplementedError("Implement me")
@classmethod
def from_name(cls, name: str, **kwargs: Any) -> DocumentProcessor:
"""
Import and instantiate a DocumentProcessor class dynamically.
Parameters
----------
name : str
A *case sensitive* string in the format "integration:ClassName".
- `integration` is the name of the Python package namespace (e.g. "beeai").
- `ClassName` is the name of the document processor class to load (e.g. "LLMDocumentReranker").
**kwargs :
any positional or keywords arguments that would be passed to the class
Returns
-------
DocumentProcessor
An instantiated document processor object of the requested class.
Raises
------
ImportError
If the specified class cannot be found in any known integration package.
"""
parsed_module = parse_module(name)
validate_class_name(parsed_module.entity_id)
TargetDocumentProcessor = load_module(parsed_module.provider_id, "document_processor") # type: ignore # noqa: N806
return TargetDocumentProcessor._class_from_name( # type: ignore[no-any-return]
class_name=parsed_module.entity_id, **kwargs
)
@abstractmethod
async def postprocess_documents(
self, documents: list[DocumentWithScore], *, query: str | None = None
) -> list[DocumentWithScore]:
raise NotImplementedError("Implement me")