"""Define subscription entities and configuration metadata value objects."""
import hashlib
import time
from dataclasses import dataclass, field
from urllib.parse import urlparse
@dataclass(frozen=True, slots=True)
class SubscriptionStatistics:
"""Summarize configuration health and update history for one subscription.
Attributes:
total_configs (int): Number of currently stored configurations.
tested_configs (int): Number with a nonnegative latency.
average_latency (float): Mean successful latency in milliseconds.
protocols (dict[str, int]): Configuration count by protocol scheme.
total_updates (int): Total attempted source updates.
successful_updates (int): Updates that fetched and parsed successfully.
failed_updates (int): Updates that failed during fetch or parsing.
"""
total_configs: int
tested_configs: int
average_latency: float
protocols: dict[str, int]
total_updates: int
successful_updates: int
failed_updates: int
[docs]
@dataclass(slots=True)
class Subscription:
"""Represent one remote subscription and its persistent application state.
The model validates URL and scheduling invariants at construction time. It
does not perform network I/O itself; fetching, parsing, and persistence are
coordinated by :class:`SubscriptionManager`.
Attributes:
url (str): Absolute HTTP or HTTPS subscription endpoint.
name (str | None): Display name; defaults to the endpoint hostname.
auto_update (bool): Whether a manager should schedule periodic updates.
update_interval (int): Scheduler interval in seconds.
enabled (bool): Whether aggregate queries should include this source.
priority (int): Application-defined source priority.
tags (list[str]): Labels used to select groups of subscriptions.
configs (list[ConfigMetadata]): Parsed configurations from the source.
id (str): Stable SHA-256-derived identifier generated from ``url``.
"""
url: str
name: str | None = None
auto_update: bool = False
update_interval: int = 86400
enabled: bool = True
priority: int = 0
tags: list[str] = field(default_factory=list)
configs: list[ConfigMetadata] = field(default_factory=list)
last_update_time: float = 0
last_fetch_success: bool = False
last_error_message: str = ""
total_updates: int = 0
successful_updates: int = 0
failed_updates: int = 0
id: str = field(init=False)
def __post_init__(self):
"""Validate constructor values and derive stable identity fields.
Returns:
None: Validation and normalization mutate the instance in place.
Raises:
ValueError: If ``url`` is not an absolute HTTP(S) URL or
``update_interval`` is less than one second.
"""
parsed = urlparse(self.url)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("Subscription URL must be an absolute HTTP(S) URL")
if self.update_interval < 1:
raise ValueError("update_interval must be positive")
self.id = hashlib.sha256(self.url.encode("utf-8")).hexdigest()[:32]
self.name = self.name or parsed.netloc
[docs]
def get_configs(self) -> list[str]:
"""Return original configuration strings in subscription order.
Returns:
list[str]: A new list containing each stored share URI.
"""
return [item.config_string for item in self.configs]
[docs]
def filter_configs(
self,
protocols=None,
min_success_rate=None,
max_latency=None,
tags=None,
name_contains=None,
) -> list[ConfigMetadata]:
"""Select configuration metadata matching all supplied constraints.
Args:
protocols (Iterable[str] | None): Allowed protocol schemes.
min_success_rate (float | None): Inclusive success-rate threshold
between ``0.0`` and ``1.0``.
max_latency (int | float | None): Inclusive maximum successful
latency in milliseconds. Untested configs are excluded.
tags (Iterable[str] | None): At least one tag must match.
name_contains (str | None): Case-insensitive regular expression
applied to the human-readable config name.
Returns:
list[ConfigMetadata]: Matching objects in their original order.
Raises:
ValueError: If ``min_success_rate`` is outside ``0..1`` or the
regular expression is invalid.
"""
import re
pattern = re.compile(name_contains, re.IGNORECASE) if name_contains else None
result = list(self.configs)
if protocols:
allowed = {item.lower() for item in protocols}
result = [item for item in result if item.protocol in allowed]
if min_success_rate is not None:
if not 0 <= min_success_rate <= 1:
raise ValueError("min_success_rate must be between 0 and 1")
result = [item for item in result if item.get_success_rate() >= min_success_rate]
if max_latency is not None:
result = [item for item in result if 0 <= item.last_latency <= max_latency]
if tags:
expected = set(tags)
result = [item for item in result if expected.intersection(item.tags)]
if pattern:
result = [item for item in result if pattern.search(item.name)]
return result
[docs]
def sort_configs(self, by="latency", reverse=False) -> list[ConfigMetadata]:
"""Return configurations ordered by a supported metadata field.
Args:
by (str): One of ``latency``, ``success_rate``, ``name``, or
``protocol``.
reverse (bool): Reverse the selected ordering when true.
Returns:
list[ConfigMetadata]: A new sorted list; the model is not mutated.
Raises:
ValueError: If ``by`` is not a supported field.
"""
keys = {
"latency": lambda item: item.last_latency if item.last_latency >= 0 else float("inf"),
"success_rate": lambda item: item.get_success_rate(),
"name": lambda item: item.name.casefold(),
"protocol": lambda item: item.protocol,
}
if by not in keys:
raise ValueError(f"Unsupported sort field: {by}")
return sorted(self.configs, key=keys[by], reverse=reverse)
[docs]
def get_statistics(self) -> SubscriptionStatistics:
"""Calculate an immutable statistical snapshot of current state.
Returns:
SubscriptionStatistics: Counts, protocol distribution, update
totals, and average latency for successfully tested configs.
"""
tested = [item for item in self.configs if item.last_latency >= 0]
protocols: dict[str, int] = {}
for item in self.configs:
protocols[item.protocol] = protocols.get(item.protocol, 0) + 1
return SubscriptionStatistics(
len(self.configs),
len(tested),
sum(item.last_latency for item in tested) / len(tested) if tested else 0.0,
protocols,
self.total_updates,
self.successful_updates,
self.failed_updates,
)
[docs]
def to_dict(self) -> dict:
"""Serialize the complete subscription into JSON-compatible values.
Returns:
dict[str, object]: Persistent source settings, update state, and
serialized configuration metadata.
"""
return {
key: value
for key, value in {
"id": self.id,
"url": self.url,
"name": self.name,
"auto_update": self.auto_update,
"update_interval": self.update_interval,
"enabled": self.enabled,
"priority": self.priority,
"tags": list(self.tags),
"configs": [item.to_dict() for item in self.configs],
"last_update_time": self.last_update_time,
"last_fetch_success": self.last_fetch_success,
"last_error_message": self.last_error_message,
"total_updates": self.total_updates,
"successful_updates": self.successful_updates,
"failed_updates": self.failed_updates,
}.items()
}
[docs]
@classmethod
def from_dict(cls, value: dict) -> "Subscription":
"""Restore a subscription from repository data.
Args:
value (dict[str, object]): JSON-decoded subscription document.
Returns:
Subscription: A validated source with restored runtime statistics.
Raises:
KeyError: If the required ``url`` field is absent.
ValueError: If persisted URL or interval values are invalid.
TypeError: If persisted fields have incompatible types.
"""
subscription = cls(
url=value["url"],
name=value.get("name"),
auto_update=value.get("auto_update", False),
update_interval=value.get("update_interval", 86400),
enabled=value.get("enabled", True),
priority=value.get("priority", 0),
tags=list(value.get("tags", [])),
)
subscription.configs = [ConfigMetadata.from_dict(item) for item in value.get("configs", [])]
for field_name in (
"last_update_time",
"last_fetch_success",
"last_error_message",
"total_updates",
"successful_updates",
"failed_updates",
):
setattr(subscription, field_name, value.get(field_name, getattr(subscription, field_name)))
return subscription