Skip to content

Commit

Permalink
refactor: use loguru
Browse files Browse the repository at this point in the history
  • Loading branch information
acgnhik committed Nov 5, 2023
1 parent c03a122 commit 78c561c
Show file tree
Hide file tree
Showing 97 changed files with 531 additions and 826 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ install_requires =
requests >= 2.24.0, < 3.0.0
aiofiles >= 22.1.0, < 23.0.0
tenacity >= 8.0.1, < 9.0.0
colorama >= 0.4.4, < 0.5.0
loguru >= 0.7.2, < 0.8.0
humanize >= 3.13.1, < 4.0.0
tqdm >= 4.62.3, < 5.0.0
attrs >= 21.2.0, < 22.0.0
Expand Down
6 changes: 2 additions & 4 deletions src/blrec/application.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import asyncio
import logging
import os
from contextlib import suppress
from typing import Iterator, List, Optional

import attr
import psutil
from loguru import logger

from . import __prog__, __version__
from .bili.helpers import ensure_room_id
Expand Down Expand Up @@ -33,8 +33,6 @@
)
from .webhook import WebHookEmitter

logger = logging.getLogger(__name__)


@attr.s(auto_attribs=True, slots=True, frozen=True)
class AppInfo:
Expand Down Expand Up @@ -100,6 +98,7 @@ async def _run(self) -> None:
await self.exit()

async def launch(self) -> None:
self._setup_logger()
logger.info('Launching Application...')
self._setup()
logger.debug(f'Default umask {os.umask(0o000)}')
Expand Down Expand Up @@ -296,7 +295,6 @@ async def change_task_options(
return await self._settings_manager.change_task_options(room_id, options)

def _setup(self) -> None:
self._setup_logger()
self._setup_exception_handler()
self._setup_space_monitor()
self._setup_space_event_submitter()
Expand Down
34 changes: 17 additions & 17 deletions src/blrec/bili/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import asyncio
import hashlib
import logging
import os
from abc import ABC
from datetime import datetime
from typing import Any, Dict, List, Mapping, Optional, Final
from typing import Any, Dict, Final, List, Mapping, Optional
from urllib.parse import urlencode

import aiohttp
from loguru import logger
from tenacity import retry, stop_after_delay, wait_exponential

from .exceptions import ApiRequestError
Expand All @@ -16,10 +15,6 @@
__all__ = 'AppApi', 'WebApi'


logger = logging.getLogger(__name__)

TRACE_API_REQ = bool(os.environ.get('BLREC_TRACE_API_REQ'))

BASE_HEADERS: Final = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en;q=0.3,en-US;q=0.2', # noqa
Expand All @@ -34,8 +29,14 @@

class BaseApi(ABC):
def __init__(
self, session: aiohttp.ClientSession, headers: Optional[Dict[str, str]] = None
self,
session: aiohttp.ClientSession,
headers: Optional[Dict[str, str]] = None,
*,
room_id: Optional[int] = None,
):
self._logger = logger.bind(room_id=room_id or '')

self.base_api_urls: List[str] = ['https://api.bilibili.com']
self.base_live_api_urls: List[str] = ['https://api.live.bilibili.com']
self.base_play_info_api_urls: List[str] = ['https://api.live.bilibili.com']
Expand Down Expand Up @@ -64,13 +65,13 @@ async def _get_json_res(self, *args: Any, **kwds: Any) -> JsonResponse:
should_check_response = kwds.pop('check_response', True)
kwds = {'timeout': self.timeout, 'headers': self.headers, **kwds}
async with self._session.get(*args, **kwds) as res:
if TRACE_API_REQ:
logger.debug(f'Request info: {res.request_info}')
self._logger.trace('Request: {}', res.request_info)
self._logger.trace('Response: {}', await res.text())
try:
json_res = await res.json()
except aiohttp.ContentTypeError:
text_res = await res.text()
logger.debug(f'Response text: {text_res[:200]}')
self._logger.debug(f'Response text: {text_res[:200]}')
raise
if should_check_response:
self._check_response(json_res)
Expand All @@ -88,8 +89,7 @@ async def _get_json(
return await self._get_json_res(url, *args, **kwds)
except Exception as exc:
exception = exc
if TRACE_API_REQ:
logger.debug(f'Failed to get json from {url}', exc_info=exc)
self._logger.trace('Failed to get json from {}: {}', url, repr(exc))
else:
assert exception is not None
raise exception
Expand All @@ -106,14 +106,14 @@ async def _get_jsons_concurrently(
json_responses = []
for idx, item in enumerate(results):
if isinstance(item, Exception):
if TRACE_API_REQ:
logger.debug(f'Failed to get json from {urls[idx]}', exc_info=item)
self._logger.trace(
'Failed to get json from {}: {}', urls[idx], repr(item)
)
exceptions.append(item)
elif isinstance(item, dict):
json_responses.append(item)
else:
if TRACE_API_REQ:
logger.debug(repr(item))
self._logger.trace('{}', repr(item))
if not json_responses:
raise exceptions[0]
return json_responses
Expand Down
77 changes: 42 additions & 35 deletions src/blrec/bili/danmaku_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import json
import logging
import struct
from contextlib import suppress
from enum import Enum, IntEnum
Expand All @@ -11,19 +10,20 @@
from aiohttp import ClientSession
from tenacity import retry, retry_if_exception_type, wait_exponential

from blrec.logging.context import async_task_with_logger_context

from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncStoppableMixin
from ..utils.string import extract_uid_from_cookie, extract_buvid_from_cookie
from ..utils.string import extract_buvid_from_cookie, extract_uid_from_cookie
from .api import AppApi, WebApi
from .exceptions import DanmakuClientAuthError
from .typing import ApiPlatform, Danmaku

__all__ = 'DanmakuClient', 'DanmakuListener', 'Danmaku', 'DanmakuCommand'


logger = logging.getLogger(__name__)
from loguru import logger


class DanmakuListener(EventListener):
Expand Down Expand Up @@ -57,6 +57,9 @@ def __init__(
headers: Optional[Dict[str, str]] = None,
) -> None:
super().__init__()
self._logger_context = {'room_id': room_id}
self._logger = logger.bind(**self._logger_context)

self.session = session
self.appapi = appapi
self.webapi = webapi
Expand Down Expand Up @@ -86,24 +89,25 @@ async def _do_start(self) -> None:
await self._update_danmu_info()
await self._connect()
await self._create_message_loop()
logger.debug('Started danmaku client')
self._logger.debug('Started danmaku client')

async def _do_stop(self) -> None:
await self._terminate_message_loop()
await self._disconnect()
logger.debug('Stopped danmaku client')
self._logger.debug('Stopped danmaku client')

@async_task_with_logger_context
async def restart(self) -> None:
logger.debug('Restarting danmaku client...')
self._logger.debug('Restarting danmaku client...')
await self.stop()
await self.start()
logger.debug('Restarted danmaku client')
self._logger.debug('Restarted danmaku client')

async def reconnect(self) -> None:
if self.stopped:
return

logger.debug('Reconnecting...')
self._logger.debug('Reconnecting...')
await self._disconnect()
await self._connect()
await self._emit('client_reconnected')
Expand All @@ -115,7 +119,7 @@ async def reconnect(self) -> None:
),
)
async def _connect(self) -> None:
logger.debug('Connecting to server...')
self._logger.debug('Connecting to server...')
try:
await self._connect_websocket()
await self._send_auth()
Expand All @@ -129,24 +133,24 @@ async def _connect(self) -> None:
await self._update_danmu_info()
raise
else:
logger.debug('Connected to server')
self._logger.debug('Connected to server')
await self._emit('client_connected')

async def _connect_websocket(self) -> None:
url = 'wss://{}:{}/sub'.format(
self._danmu_info['host_list'][self._host_index]['host'],
self._danmu_info['host_list'][self._host_index]['wss_port'],
)
logger.debug(f'Connecting WebSocket... {url}')
self._logger.debug(f'Connecting WebSocket... {url}')
try:
self._ws = await self.session.ws_connect(
url, timeout=5, headers=self.headers
)
except Exception as exc:
logger.debug(f'Failed to connect WebSocket: {repr(exc)}')
self._logger.debug(f'Failed to connect WebSocket: {repr(exc)}')
raise
else:
logger.debug('Connected WebSocket')
self._logger.debug('Connected WebSocket')

async def _send_auth(self) -> None:
auth_msg = json.dumps(
Expand All @@ -161,26 +165,28 @@ async def _send_auth(self) -> None:
}
)
data = Frame.encode(WS.OP_USER_AUTHENTICATION, auth_msg)
logger.debug('Sending user authentication...')
self._logger.debug('Sending user authentication...')
try:
await self._ws.send_bytes(data)
except Exception as exc:
logger.debug(f'Failed to sent user authentication: {repr(exc)}')
self._logger.debug(f'Failed to sent user authentication: {repr(exc)}')
raise
else:
logger.debug('Sent user authentication')
self._logger.debug('Sent user authentication')

async def _recieve_auth_reply(self) -> aiohttp.WSMessage:
logger.debug('Receiving user authentication reply...')
self._logger.debug('Receiving user authentication reply...')
try:
msg = await self._ws.receive(timeout=5)
if msg.type != aiohttp.WSMsgType.BINARY:
raise aiohttp.ClientError(msg)
except Exception as exc:
logger.debug(f'Failed to receive user authentication reply: {repr(exc)}')
self._logger.debug(
f'Failed to receive user authentication reply: {repr(exc)}'
)
raise
else:
logger.debug('Recieved user authentication reply')
self._logger.debug('Recieved user authentication reply')
return msg

async def _handle_auth_reply(self, reply: aiohttp.WSMessage) -> None:
Expand All @@ -190,7 +196,7 @@ async def _handle_auth_reply(self, reply: aiohttp.WSMessage) -> None:
code = cast(int, json.loads(msg)['code'])

if code == WS.AUTH_OK:
logger.debug('Auth OK')
self._logger.debug('Auth OK')
self._create_heartbeat_task()
elif code == WS.AUTH_TOKEN_ERROR:
raise DanmakuClientAuthError(f'Token expired: {code}')
Expand All @@ -204,7 +210,7 @@ def _rotate_api_platform(self) -> None:
self._api_platform = 'android'

async def _update_danmu_info(self) -> None:
logger.debug(f'Updating danmu info via {self._api_platform} api...')
self._logger.debug(f'Updating danmu info via {self._api_platform} api...')
api: Union[WebApi, AppApi]
if self._api_platform == 'web':
api = self.webapi
Expand All @@ -213,15 +219,15 @@ async def _update_danmu_info(self) -> None:
try:
self._danmu_info = await api.get_danmu_info(self._room_id)
except Exception as exc:
logger.warning(f'Failed to update danmu info: {repr(exc)}')
self._logger.warning(f'Failed to update danmu info: {repr(exc)}')
self._danmu_info = COMMON_DANMU_INFO
else:
logger.debug('Danmu info updated')
self._logger.debug('Danmu info updated')

async def _disconnect(self) -> None:
await self._cancel_heartbeat_task()
await self._close_websocket()
logger.debug('Disconnected from server')
self._logger.debug('Disconnected from server')
await self._emit('client_disconnected')

async def _close_websocket(self) -> None:
Expand All @@ -237,14 +243,14 @@ async def _cancel_heartbeat_task(self) -> None:
with suppress(asyncio.CancelledError):
await self._heartbeat_task

@aio_task_with_room_id
@async_task_with_logger_context
async def _send_heartbeat(self) -> None:
data = Frame.encode(WS.OP_HEARTBEAT, '')
while True:
try:
await self._ws.send_bytes(data)
except Exception as exc:
logger.warning(f'Failed to send heartbeat: {repr(exc)}')
self._logger.warning(f'Failed to send heartbeat: {repr(exc)}')
await self._emit('error_occurred', exc)
task = asyncio.create_task(self.restart())
task.add_done_callback(exception_callback)
Expand All @@ -254,15 +260,15 @@ async def _send_heartbeat(self) -> None:
async def _create_message_loop(self) -> None:
self._message_loop_task = asyncio.create_task(self._message_loop())
self._message_loop_task.add_done_callback(exception_callback)
logger.debug('Created message loop')
self._logger.debug('Created message loop')

async def _terminate_message_loop(self) -> None:
self._message_loop_task.cancel()
with suppress(asyncio.CancelledError):
await self._message_loop_task
logger.debug('Terminated message loop')
self._logger.debug('Terminated message loop')

@aio_task_with_room_id
@async_task_with_logger_context
async def _message_loop(self) -> None:
while True:
for msg in await self._receive():
Expand Down Expand Up @@ -292,8 +298,7 @@ async def _receive(self) -> List[Dict[str, Any]]:
else:
await self._handle_receive_error(ValueError(wsmsg))

@staticmethod
async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]:
async def _handle_data(self, data: bytes) -> Optional[List[Dict[str, Any]]]:
loop = asyncio.get_running_loop()

try:
Expand All @@ -304,12 +309,14 @@ async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]:
elif op == WS.OP_HEARTBEAT_REPLY:
pass
except Exception as e:
logger.warning(f'Failed to handle data: {repr(e)}, data: {repr(data)}')
self._logger.warning(
f'Failed to handle data: {repr(e)}, data: {repr(data)}'
)

return None

async def _handle_receive_error(self, exc: Exception) -> None:
logger.warning(f'Failed to receive message: {repr(exc)}')
self._logger.warning(f'Failed to receive message: {repr(exc)}')
await self._emit('error_occurred', exc)
if isinstance(exc, asyncio.TimeoutError):
return
Expand All @@ -322,7 +329,7 @@ def _reset_retry(self) -> None:
async def _retry(self) -> None:
if self._retry_count < self._MAX_RETRIES:
if self._retry_delay > 0:
logger.debug(
self._logger.debug(
'Retry after {} second{}'.format(
self._retry_delay, 's' if self._retry_delay > 1 else ''
)
Expand Down
2 changes: 1 addition & 1 deletion src/blrec/bili/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

async def room_init(room_id: int) -> ResponseData:
async with aiohttp.ClientSession(raise_for_status=True) as session:
api = WebApi(session)
api = WebApi(session, room_id=room_id)
return await api.room_init(room_id)


Expand Down
Loading

0 comments on commit 78c561c

Please sign in to comment.