olgram/server/custom.py
2022-11-06 04:32:32 +04:00

360 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from aiogram import Bot as AioBot, Dispatcher
from aiogram.dispatcher.webhook import WebhookRequestHandler
from aiogram.dispatcher.webhook import SendMessage
from aiogram import exceptions
from aiogram import types
from contextvars import ContextVar
from aiohttp.web_exceptions import HTTPNotFound
from aioredis.commands import create_redis_pool
from aioredis import Redis
from tortoise.expressions import F
import logging
import typing as ty
from olgram.settings import ServerSettings
from olgram.models.models import Bot, GroupChat, BannedUser, MailingUser
from locales.locale import _, translators
from server.inlines import inline_handler
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
db_bot_instance: ContextVar[Bot] = ContextVar('db_bot_instance')
_redis: ty.Optional[Redis] = None
def _get_translator(message: types.Message) -> ty.Callable:
if not message.from_user.locale:
return _
return translators.get(message.from_user.locale.language, _)
async def init_redis():
global _redis
_redis = await create_redis_pool(ServerSettings.redis_path())
def _message_unique_id(bot_id: int, message_id: int) -> str:
return f"{bot_id}_{message_id}"
def _thread_uniqie_id(bot_id: int, chat_id: int) -> str:
return f"thread_{bot_id}_{chat_id}"
def _last_message_uid(bot_id: int, chat_id: int) -> str:
return f"lm_{bot_id}_{chat_id}"
def _antiflood_marker_uid(bot_id: int, chat_id: int) -> str:
return f"af_{bot_id}_{chat_id}"
def _on_security_policy(message: types.Message, bot):
_ = _get_translator(message)
text = _("<b>Политика конфиденциальности</b>\n\n"
"Этот бот не хранит ваши сообщения, имя пользователя и @username. При отправке сообщения (кроме команд "
"/start и /security_policy) ваш идентификатор пользователя записывается в кеш на некоторое время и потом "
"удаляется из кеша. Этот идентификатор используется для общения с оператором.\n\n")
if bot.enable_additional_info:
text += _("При отправке сообщения (кроме команд /start и /security_policy) оператор <b>видит</b> ваши имя "
"пользователя, @username и идентификатор пользователя в силу настроек, которые оператор указал при "
"создании бота.\n\n")
else:
text += _("В зависимости от ваших настроек конфиденциальности Telegram, оператор может видеть ваш username, "
"имя пользователя и другую информацию.\n\n")
if bot.enable_mailing:
text += _("В этом боте включена массовая рассылка в силу настроек, которые оператор указал при создании бота. "
"Ваш идентификатор пользователя может быть записан в базу данных на долгое время")
else:
text += _("В этом боте нет массовой рассылки сообщений")
return SendMessage(chat_id=message.chat.id,
text=text,
parse_mode="HTML")
async def send_user_message(message: types.Message, super_chat_id: int, bot):
"""Переслать сообщение от пользователя, добавлять к нему user info при необходимости"""
if bot.enable_additional_info:
user_info = _("Сообщение от пользователя ")
user_info += message.from_user.full_name
if message.from_user.username:
user_info += " | @" + message.from_user.username
user_info += f" | #ID{message.from_user.id}"
# Добавлять информацию в конец текста
if message.content_type == types.ContentType.TEXT and len(message.text) + len(user_info) < 4093: # noqa:E721
new_message = await message.bot.send_message(super_chat_id, message.text + "\n\n" + user_info)
else: # Не добавлять информацию в конец текста, информация отдельным сообщением
new_message = await message.bot.send_message(super_chat_id, text=user_info)
new_message_2 = await message.copy_to(super_chat_id, reply_to_message_id=new_message.message_id)
await _redis.set(_message_unique_id(bot.pk, new_message_2.message_id), message.chat.id,
pexpire=ServerSettings.redis_timeout_ms())
await _redis.set(_message_unique_id(bot.pk, new_message.message_id), message.chat.id,
pexpire=ServerSettings.redis_timeout_ms())
return new_message
else:
try:
new_message = await message.forward(super_chat_id)
except exceptions.MessageCantBeForwarded:
new_message = await message.copy_to(super_chat_id)
await _redis.set(_message_unique_id(bot.pk, new_message.message_id), message.chat.id,
pexpire=ServerSettings.redis_timeout_ms())
return new_message
async def send_to_superchat(is_super_group: bool, message: types.Message, super_chat_id: int, bot):
"""Пересылка сообщения от пользователя оператору (логика потоков сообщений)"""
if is_super_group and bot.enable_threads:
thread_first_message = await _redis.get(_thread_uniqie_id(bot.pk, message.chat.id))
if thread_first_message:
# переслать в супер-чат, отвечая на предыдущее сообщение
try:
new_message = await message.copy_to(super_chat_id, reply_to_message_id=int(thread_first_message))
await _redis.set(_message_unique_id(bot.pk, new_message.message_id), message.chat.id,
pexpire=ServerSettings.redis_timeout_ms())
except exceptions.BadRequest:
new_message = await send_user_message(message, super_chat_id, bot)
await _redis.set(_thread_uniqie_id(bot.pk, message.chat.id), new_message.message_id,
pexpire=ServerSettings.thread_timeout_ms())
else:
# переслать супер-чат
new_message = await send_user_message(message, super_chat_id, bot)
await _redis.set(_thread_uniqie_id(bot.pk, message.chat.id), new_message.message_id,
pexpire=ServerSettings.thread_timeout_ms())
else: # личные сообщения не поддерживают потоки сообщений: просто отправляем сообщение
await send_user_message(message, super_chat_id, bot)
async def handle_user_message(message: types.Message, super_chat_id: int, bot):
"""Обычный пользователь прислал сообщение в бот, нужно переслать его операторам"""
_ = _get_translator(message)
is_super_group = super_chat_id < 0
# Записать пользователя для рассылки, если она включена
if bot.enable_mailing:
_, __ = await MailingUser.get_or_create(telegram_id=message.chat.id, bot=bot)
# Проверить, не забанен ли пользователь
banned = await bot.banned_users.filter(telegram_id=message.chat.id)
if banned:
return SendMessage(chat_id=message.chat.id,
text=_("Вы заблокированы в этом боте"))
# Проверить анти-флуд
if bot.enable_antiflood:
if await _redis.get(_antiflood_marker_uid(bot.pk, message.chat.id)):
return SendMessage(chat_id=message.chat.id,
text=_("Слишком много сообщений, подождите одну минуту"))
await _redis.setex(_antiflood_marker_uid(bot.pk, message.chat.id), 60, 1)
# Пересылаем сообщение в супер-чат
try:
await send_to_superchat(is_super_group, message, super_chat_id, bot)
except (exceptions.Unauthorized, exceptions.ChatNotFound):
return SendMessage(chat_id=message.chat.id, text=_("Не удаётся связаться с владельцем бота"))
except exceptions.TelegramAPIError as err:
_logger.error(f"(exception on forwarding) {err}")
return
bot.incoming_messages_count = F("incoming_messages_count") + 1
await bot.save(update_fields=["incoming_messages_count"])
# И отправить пользователю специальный текст, если он указан и если давно не отправляли
if bot.second_text:
send_auto = not await _redis.get(_last_message_uid(bot.pk, message.chat.id))
await _redis.setex(_last_message_uid(bot.pk, message.chat.id), 60 * 60 * 3, 1)
if send_auto:
return SendMessage(chat_id=message.chat.id, text=bot.second_text, parse_mode="HTML")
async def handle_operator_message(message: types.Message, super_chat_id: int, bot):
"""Оператор написал что-то, нужно переслать сообщение обратно пользователю, или забанить его и т.д."""
_ = _get_translator(message)
if message.reply_to_message:
if message.reply_to_message.from_user.id != message.bot.id:
return # нас интересуют только ответы на сообщения бота
# В супер-чате кто-то ответил на сообщение пользователя, нужно переслать тому пользователю
chat_id = await _redis.get(_message_unique_id(bot.pk, message.reply_to_message.message_id))
if not chat_id:
chat_id = message.reply_to_message.forward_from_chat
if not chat_id:
return SendMessage(chat_id=message.chat.id,
text=_("<i>Невозможно переслать сообщение: автор не найден (сообщение слишком "
"старое?)</i>"),
parse_mode="HTML")
chat_id = int(chat_id)
if message.text == "/ban":
user, create = await BannedUser.get_or_create(telegram_id=chat_id, bot=bot)
await user.save()
return SendMessage(chat_id=message.chat.id, text=_("Пользователь заблокирован"))
if message.text == "/unban":
banned_user = await bot.banned_users.filter(telegram_id=chat_id).first()
if not banned_user:
return SendMessage(chat_id=message.chat.id, text=_("Пользователь не был забанен"))
else:
await banned_user.delete()
return SendMessage(chat_id=message.chat.id, text=_("Пользователь разбанен"))
try:
await message.copy_to(chat_id)
except (exceptions.MessageError, exceptions.Unauthorized):
await message.reply(_("<i>Невозможно переслать сообщение (автор заблокировал бота?)</i>"),
parse_mode="HTML")
return
bot.outgoing_messages_count = F("outgoing_messages_count") + 1
await bot.save(update_fields=["outgoing_messages_count"])
elif super_chat_id > 0:
# в супер-чате кто-то пишет сообщение сам себе, только для личных сообщений
await message.forward(super_chat_id)
# И отправить пользователю специальный текст, если он указан
if bot.second_text:
return SendMessage(chat_id=message.chat.id, text=bot.second_text, parse_mode="HTML")
async def message_handler(message: types.Message, *args, **kwargs):
_ = _get_translator(message)
bot = db_bot_instance.get()
if message.text and message.text == "/start":
# На команду start нужно ответить, не пересылая сообщение никуда
text = bot.start_text
if bot.enable_olgram_text:
text += _(ServerSettings.append_text())
return SendMessage(chat_id=message.chat.id, text=text, parse_mode="HTML")
if message.text and message.text == "/security_policy":
# На команду security_policy нужно ответить, не пересылая сообщение никуда
return _on_security_policy(message, bot)
super_chat_id = await bot.super_chat_id()
if message.chat.id != super_chat_id:
# Это обычный чат
return await handle_user_message(message, super_chat_id, bot)
else:
# Это супер-чат
return await handle_operator_message(message, super_chat_id, bot)
async def edited_message_handler(message: types.Message, *args, **kwargs):
return await message_handler(message, *args, **kwargs, is_edited=True)
async def receive_invite(message: types.Message):
bot = db_bot_instance.get()
for member in message.new_chat_members:
if member.id == message.bot.id:
chat, _ = await GroupChat.get_or_create(chat_id=message.chat.id,
defaults={"name": message.chat.full_name})
chat.name = message.chat.full_name
await chat.save()
if chat not in await bot.group_chats.all():
await bot.group_chats.add(chat)
await bot.save()
break
async def receive_group_create(message: types.Message):
bot = db_bot_instance.get()
chat, _ = await GroupChat.get_or_create(chat_id=message.chat.id,
defaults={"name": message.chat.full_name})
chat.name = message.chat.full_name
await chat.save()
if chat not in await bot.group_chats.all():
await bot.group_chats.add(chat)
await bot.save()
async def receive_left(message: types.Message):
bot = db_bot_instance.get()
if message.left_chat_member.id == message.bot.id:
chat = await bot.group_chats.filter(chat_id=message.chat.id).first()
if chat:
await bot.group_chats.remove(chat)
bot_group_chat = await bot.group_chat
if bot_group_chat == chat:
bot.group_chat = None
await bot.save()
async def receive_inline(inline_query):
_logger.info("inline handler")
bot = db_bot_instance.get()
return await inline_handler(inline_query, bot)
async def receive_migrate(message: types.Message):
bot = db_bot_instance.get()
from_id = message.chat.id
to_id = message.migrate_to_chat_id
chats = await bot.group_chats.filter(chat_id=from_id)
for chat in chats:
chat.chat_id = to_id
await chat.save(update_fields=["chat_id"])
class CustomRequestHandler(WebhookRequestHandler):
def __init__(self, *args, **kwargs):
self._dispatcher = None
super(CustomRequestHandler, self).__init__(*args, **kwargs)
async def _create_dispatcher(self):
key = self.request.url.path[1:]
bot = await Bot.filter(code=key).first()
if not bot:
return None
db_bot_instance.set(bot)
dp = Dispatcher(AioBot(bot.decrypted_token()))
supported_messages = [types.ContentType.TEXT,
types.ContentType.CONTACT,
types.ContentType.ANIMATION,
types.ContentType.AUDIO,
types.ContentType.DOCUMENT,
types.ContentType.PHOTO,
types.ContentType.STICKER,
types.ContentType.VIDEO,
types.ContentType.VOICE,
types.ContentType.LOCATION]
dp.register_message_handler(message_handler, content_types=supported_messages)
dp.register_edited_message_handler(edited_message_handler, content_types=supported_messages)
dp.register_message_handler(receive_invite, content_types=[types.ContentType.NEW_CHAT_MEMBERS])
dp.register_message_handler(receive_left, content_types=[types.ContentType.LEFT_CHAT_MEMBER])
dp.register_message_handler(receive_migrate, content_types=[types.ContentType.MIGRATE_TO_CHAT_ID])
dp.register_message_handler(receive_group_create, content_types=[types.ContentType.GROUP_CHAT_CREATED])
dp.register_inline_handler(receive_inline)
return dp
async def post(self):
dispatcher = await self._create_dispatcher()
if not dispatcher:
raise HTTPNotFound()
Dispatcher.set_current(dispatcher)
AioBot.set_current(dispatcher.bot)
return await super(CustomRequestHandler, self).post()
def get_dispatcher(self):
"""
Get Dispatcher instance from environment
:return: :class:`aiogram.Dispatcher`
"""
return Dispatcher.get_current()