import abc
import time
import warnings
from datetime import datetime
from json import JSONDecodeError
from typing import AsyncIterator, ForwardRef, Iterator, List, Literal, TypeVar, Union
from playwright.async_api import BrowserContext as AsyncBrowserContext
from playwright.sync_api import BrowserContext as SyncBrowserContext
from pydantic import ValidationError
from tiktokapipy import TikTokAPIError, TikTokAPIWarning
from tiktokapipy.util.queries import (
get_challenge_detail_async,
get_challenge_detail_sync,
make_request_async,
make_request_sync,
)
T = TypeVar("T")
Challenge = ForwardRef("Challenge")
Comment = ForwardRef("Comment")
User = ForwardRef("User")
Video = ForwardRef("Video")
[docs]class DeferredIterator(abc.ABC, Iterator[T], AsyncIterator[T]):
def __init__(self, api):
self._api = api
self._collected_values = []
self._head = 0
self._cursor = 0
self._has_more = True
self._limit = -1
@abc.abstractmethod
def _fetch_sync(self):
pass
@abc.abstractmethod
async def _fetch_async(self):
pass
def __iter__(self):
if isinstance(self._api.context, AsyncBrowserContext):
raise TikTokAPIError(
"Attempting to use AsyncTikTokAPI in a synchronous context. Use `async for` instead."
)
self._head = 0
return self
def __next__(self):
if isinstance(self._api.context, AsyncBrowserContext):
raise TikTokAPIError(
"Attempting to use AsyncTikTokAPI in a synchronous context. Use `async for` instead."
)
if 0 <= self._limit <= self._head:
raise StopIteration
if self._head >= len(self._collected_values):
if not self._has_more:
raise StopIteration
self._fetch_sync()
if 0 <= self._limit < len(self._collected_values):
self._collected_values = self._collected_values[: self._limit]
self._has_more = False
out = self._collected_values[self._head]
self._head += 1
return out
def __aiter__(self):
if isinstance(self._api.context, SyncBrowserContext):
raise TikTokAPIError(
"Attempting to use TikTokAPI in an asynchronous context. Use `for` instead."
)
self._head = 0
return self
async def __anext__(self):
if isinstance(self._api.context, SyncBrowserContext):
raise TikTokAPIError(
"Attempting to use TikTokAPI in an asynchronous context. Use `for` instead."
)
if 0 <= self._limit <= self._head:
raise StopAsyncIteration
if self._head >= len(self._collected_values):
if not self._has_more:
raise StopAsyncIteration
await self._fetch_async()
if 0 <= self._limit < len(self._collected_values):
self._collected_values = self._collected_values[: self._limit]
self._has_more = False
out = self._collected_values[self._head]
self._head += 1
return out
def __getitem__(self, item):
return self._collected_values[item]
[docs] def limit(self, limit: int) -> "DeferredIterator":
"""
Set a limit to the number of items to iterate over. Can be useful to not iterate over an absurdly large amount
of data.
Example usage:
.. code-block:: python
for something in iterator.limit(30):
# do something
"""
if limit < 0:
self._limit = -1
return self
self._limit = limit
if len(self._collected_values) > self._limit:
self._collected_values = self._collected_values[: self._limit]
self._has_more = False
return self
[docs]class DeferredItemListIterator(DeferredIterator[Video]):
def __init__(
self,
api,
from_type: Literal["post", "challenge"],
target_id: Union[int, str],
**extra_params,
):
super().__init__(api)
self.from_type = from_type
self._target_id = target_id
self._extra_params = extra_params
if self.from_type == "post":
self._cursor = int(time.time()) * 1000
def _fetch_sync(self):
from tiktokapipy.models.raw_data import APIResponse
# noinspection PyTypeChecker
try:
raw = make_request_sync(
f"{self.from_type}/item_list/",
self._cursor,
self._target_id,
self._api.context,
**self._extra_params,
)
except JSONDecodeError:
readable_cursor = (
f"video #{self._cursor}"
if self.from_type == "challenge"
else datetime.fromtimestamp(self._cursor // 1000).strftime("%c")
)
warnings.warn(
f"Unable to grab videos beyond {readable_cursor} (JSONDecodeError), stopping iteration early."
f"Try again if you think this is a mistake.",
category=TikTokAPIWarning,
stacklevel=2,
)
self._has_more = False
raise StopIteration
converted = APIResponse.model_validate(raw)
if not converted.item_list:
self._has_more = False
raise StopIteration
self._has_more = converted.has_more
self._cursor = converted.cursor
for video in converted.item_list:
try:
self._collected_values.append(self._api.video(video.id))
except (TikTokAPIError, ValidationError):
warnings.warn(
f"Unable to grab video with id {video.id}",
category=TikTokAPIWarning,
stacklevel=2,
)
async def _fetch_async(self):
from tiktokapipy.models.raw_data import APIResponse
# noinspection PyTypeChecker
try:
raw = await make_request_async(
f"{self.from_type}/item_list/",
self._cursor,
self._target_id,
self._api.context,
**self._extra_params,
)
except JSONDecodeError:
readable_cursor = (
f"video #{self._cursor}"
if self.from_type == "challenge"
else datetime.fromtimestamp(self._cursor).strftime("%c")
)
warnings.warn(
f"Unable to grab videos beyond {readable_cursor}, stopping iteration early."
f"Try again if you think this is a mistake.",
category=TikTokAPIWarning,
stacklevel=2,
)
self._has_more = False
raise StopAsyncIteration
converted = APIResponse.model_validate(raw)
if not converted.item_list:
self._has_more = False
raise StopAsyncIteration
self._has_more = converted.has_more
self._cursor = converted.cursor
for video in converted.item_list:
try:
self._collected_values.append(await self._api.video(video.id))
except (TikTokAPIError, ValidationError):
warnings.warn(
f"Unable to grab video with id {video.id}",
category=TikTokAPIWarning,
stacklevel=2,
)
[docs]class DeferredChallengeIterator(Iterator[Challenge], AsyncIterator[Challenge]):
def __init__(self, api, challenge_names: List[str]):
self._api = api
self._collected_values: List[Challenge] = []
self._challenge_names = challenge_names
self.head = 0
def _fetch_sync(self):
from tiktokapipy.models.raw_data import ChallengePage
converted = ChallengePage.model_validate(
get_challenge_detail_sync(
self._challenge_names[self.head], self._api.context
)
)
challenge = converted.challenge_info.challenge
challenge._api = self._api
self._collected_values.append(challenge)
async def _fetch_async(self):
from tiktokapipy.models.raw_data import ChallengePage
converted = ChallengePage.model_validate(
await get_challenge_detail_async(
self._challenge_names[self.head], self._api.context
)
)
challenge = converted.challenge_info.challenge
challenge._api = self._api
self._collected_values.append(challenge)
def __iter__(self):
if isinstance(self._api.context, AsyncBrowserContext):
raise TikTokAPIError(
"Attempting to use AsyncTikTokAPI in a synchronous context. Use `async for` instead."
)
self.head = 0
return self
def __next__(self):
if isinstance(self._api.context, AsyncBrowserContext):
raise TikTokAPIError(
"Attempting to use AsyncTikTokAPI in a synchronous context. Use `async for` instead."
)
if self.head == len(self._collected_values):
if self.head == len(self._challenge_names):
raise StopIteration
self._fetch_sync()
out = self._collected_values[self.head]
self.head += 1
return out
def __aiter__(self):
if isinstance(self._api.context, SyncBrowserContext):
raise TikTokAPIError(
"Attempting to use TikTokAPI in an asynchronous context. Use `for` instead."
)
self.head = 0
return self
async def __anext__(self):
if isinstance(self._api.context, SyncBrowserContext):
raise TikTokAPIError(
"Attempting to use TikTokAPI in an asynchronous context. Use `for` instead."
)
if self.head == len(self._collected_values):
if self.head == len(self._challenge_names):
raise StopAsyncIteration
await self._fetch_async()
out = self._collected_values[self.head]
self.head += 1
return out
[docs]class DeferredUserGetterSync:
def __init__(self, api, unique_id: str):
self._api = api
self._unique_id = unique_id
self._user = None
[docs] def __call__(self) -> User:
if isinstance(self._api.context, AsyncBrowserContext):
raise TikTokAPIError(
"Attempting to use AsyncTikTokAPI in an synchronous context."
)
if self._user is None:
self._user = self._api.user(self._unique_id)
self._user._api = self._api
return self._user
[docs]class DeferredUserGetterAsync:
def __init__(self, api, unique_id: str):
self._api = api
self._unique_id = unique_id
self._user = None
[docs] async def __call__(self) -> User:
if isinstance(self._api.context, SyncBrowserContext):
raise TikTokAPIError(
"Attempting to use TikTokAPI in a asynchronous context."
)
if self._user is None:
self._user = await self._api.user(self._unique_id)
self._user._api = self._api
return self._user