531 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
import logging
from datetime import datetime
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.enums.dice_emoji import DiceEmoji
from aiogram import F, html
from aiogram.types import Message
from aiogram.filters import Command, CommandObject, CommandStart
from aiogram.utils.formatting import Text, Bold, as_list, as_marked_section, as_key_value, HashTag
from aiogram.types import FSInputFile, URLInputFile, BufferedInputFile
# новый импорт!
from aiogram.utils.markdown import hide_link #для скрытой ссылки
# новый импорт!
from aiogram.utils.keyboard import ReplyKeyboardBuilder # для создания кнопок
# Новые импорты!
from contextlib import suppress
from aiogram.exceptions import TelegramBadRequest
from config_reader import config
# Включаем логирование, чтобы не пропустить важные сообщения
logging.basicConfig(level=logging.INFO)
# Для записей с типом Secret* необходимо
# вызывать метод get_secret_value(),
# чтобы получить настоящее содержимое вместо '*******'
bot = Bot(
token=config.bot_token.get_secret_value(),
default=DefaultBotProperties(
parse_mode=ParseMode.HTML
# тут ещё много других интересных настроек
))
# Объект бота напрямую из .env
# bot = Bot(token=os.getenv("BOT_TOKEN"))
# Диспетчер
dp = Dispatcher()
dp["started_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
# Хэндлер на команду /start
@dp.message(Command("start"))
async def cmd_start(message: types.Message):
# await message.answer("Привет! ")
content = as_list(
Text(
"Привет! ",
Bold(message.from_user.first_name)
),
as_marked_section(
Bold("Я умею:"),
"/test1 - Отвечу Test1",
"/answer - Просто отвечу",
"/reply - Отвечу ответом",
"/name - Поприветствую тебя по Имени и Фамилии",
"/aboute - Дам тебе характеристику",
"/dice - Подкину для тебя кубик, загадай число ;)",
"Если ты мне отправишь гифку, я тебе ей же и отвечу",
"----------",
"/more - Еще больше возможностей!",
"/vfy - Получить подтверждение Вашего номера телефона",
marker="",
),
as_marked_section(
Bold("Failed:"),
"Не смогу полететь на луну:( ",
marker="",
),
HashTag("#ищу"),
# Text(
# "Номер телефона, ",
# Bold(message.contact.phone_number)
# )
)
await message.answer(
**content.as_kwargs()
)
@dp.message(Command("more"))
async def cmd_more(message: types.Message):
# await message.answer("Привет! ")
content = as_list(
Text(
Bold(message.from_user.first_name),
" Еще я могу вот что: ",
),
as_marked_section(
Bold(""),
"Если ты напишешь в тексте ",
"адрес сайта,",
"e-mail,",
"Номер телефона,",
"Я распознаю их и напишу что нашел",
"/special_buttons - выведу спецкнопки с командами",
"/dice - Подкину для тебя кубик, загадай число ;)",
"/settimer <time> <message> - через установленное время сообще Message ;)",
"/hidden_link - Подкину для тебя угарную фотку ;)",
marker="",
),
HashTag("#еще"),
# Text(
# "Номер телефона, ",
# Bold(message.contact.phone_number)
# )
)
await message.answer(
**content.as_kwargs()
)
# Хэндлер на команду /test1
@dp.message(Command("test1"))
async def cmd_test1(message: types.Message):
await message.reply("Test 1")
# Хэндлер на команду /test2
# Без декоратора, т.к. регистрируется ниже в функции main()
async def cmd_test2(message: types.Message):
await message.reply("Test 2")
@dp.message(Command("answer"))
async def cmd_answer(message: types.Message):
await message.answer("Это простой ответ")
# Хэндлер на команду /name
@dp.message(Command("name"))
async def cmd_name(message: Message):
await message.answer(
f"Доброго дня тебе, <b>{html.bold(html.quote(message.from_user.full_name))}</b>",
parse_mode=ParseMode.HTML
# f"Мой ник, <b>{html.bold(html.quote(message.from_user.username))}</b>",
# parse_mode=ParseMode.HTML
)
@dp.message(Command("aboute"))
async def cmd_aboute(message: Message):
content = as_list(
Text(
"User_name, ",
Bold(message.from_user.username)
),
as_marked_section(
Bold("Это Ты:"),
"Молодец",
"Красава",
"Умка",
marker="",
),
as_marked_section(
Bold("Это точно не ты:"),
"Душнила",
marker="",
),
HashTag("#характеристика"),
# Text(
# "Номер телефона, ",
# Bold(message.contact.phone_number)
# )
)
await message.answer(
**content.as_kwargs()
)
@dp.message(Command("reply"))
async def cmd_reply(message: types.Message):
await message.reply('Это ответ с "ответом"')
@dp.message(Command("dice"))
async def cmd_dice(message: types.Message):
await message.answer_dice(emoji=DiceEmoji.DICE)
# передача сообщения в другой чат с номером чата -1001826767638
@dp.message(Command("dice2"))
async def cmd_dice2(message: types.Message, bot: Bot):
await bot.send_dice(-1001826767638, emoji=DiceEmoji.DICE)
# запрос даты запуска бота
@dp.message(Command("info"))
async def cmd_info(message: types.Message, started_at: str):
await message.answer(f"Бот запущен {started_at}")
# сохраним формат введенного пользователем текста - после теста отключим хендлер
#@dp.message(F.text)
async def echo_with_time(message: Message):
# Получаем текущее время в часовом поясе ПК
time_now = datetime.now().strftime('%H:%M')
# Создаём подчёркнутый текст
added_text = html.underline(f"Написано в {time_now}")
not_anderstand = (message.from_user.first_name)
# Отправляем новое сообщение с добавленным текстом
await message.answer(f"{message.html_text}\n\n{not_anderstand}!!! Я не понимаю эту команду :(\nДля получения списка известных мне команд напиши /start \n{added_text}", parse_mode="HTML")
@dp.message(F.text.lower() == "круто")
async def without_puree(message: types.Message):
await message.reply("Соглашусь, это круто!")
@dp.message(Command("settimer", prefix="/!")) # добавим дополнительные префиксы для оперделения команды
async def cmd_settimer(
message: Message,
command: CommandObject
):
# Если не переданы никакие аргументы, то
# command.args будет None
if command.args is None:
await message.answer(
"Ошибка: не переданы аргументы"
)
return
# Пробуем разделить аргументы на две части по первому встречному пробелу
try:
delay_time, text_to_send = command.args.split(" ", maxsplit=1)
# Если получилось меньше двух частей, вылетит ValueError
except ValueError:
await message.answer(
"Ошибка: неправильный формат команды. Пример:\n"
"/settimer <time> <message>"
)
return
await message.answer(
"Таймер добавлен!\n"
f"Время: {delay_time}\n"
f"Текст: {text_to_send}"
)
# Мгновенный ответ пользователю гифкой, которую он прислал
@dp.message(F.animation)
async def echo_gif(message: Message):
await message.reply_animation(message.animation.file_id)
# await message.answer_animation(message.file_id)
await message.answer_animation(
animation=message.animation.file_id,
caption="Я сегодня:",
show_caption_above_media=True
)
@dp.message(Command('images'))
async def upload_photo(message: Message):
# Сюда будем помещать file_id отправленных файлов, чтобы потом ими воспользоваться
file_ids = []
# Чтобы продемонстрировать BufferedInputFile, воспользуемся "классическим"
# открытием файла через `open()`. Но, вообще говоря, этот способ
# лучше всего подходит для отправки байтов из оперативной памяти
# после проведения каких-либо манипуляций, например, редактированием через Pillow
with open("buffer_emulation.jpg", "rb") as image_from_buffer:
result = await message.answer_photo(
BufferedInputFile(
image_from_buffer.read(),
filename="image from buffer.jpg"
),
caption="Изображение из буфера"
)
file_ids.append(result.photo[-1].file_id)
# Отправка файла из файловой системы
image_from_pc = FSInputFile("image_from_pc.jpg")
result = await message.answer_photo(
image_from_pc,
caption="Изображение из файла на компьютере"
)
file_ids.append(result.photo[-1].file_id)
# Отправка файла по ссылке
image_from_url = URLInputFile("https://picsum.photos/seed/groosha/400/300")
result = await message.answer_photo(
image_from_url,
caption="Изображение по ссылке"
)
file_ids.append(result.photo[-1].file_id)
await message.answer("Отправленные файлы:\n"+"\n".join(file_ids))
# отправка приветственного сообщения вошедшему
@dp.message(F.new_chat_members)
async def somebody_added(message: Message):
for user in message.new_chat_members:
# проперти full_name берёт сразу имя И фамилию
# (на скриншоте выше у юзеров нет фамилии)
await message.reply(f"Привет, {user.full_name}")
# Прячем ссылку в HTML
@dp.message(Command("hidden_link"))
async def cmd_hidden_link(message: Message):
await message.answer(
f"{hide_link('https://i.pinimg.com/736x/26/3b/80/263b80dab1464429dd3f082a6601ad76.jpg')}"
f"Документация Telegram: *существует*\n"
f"Пользователи: *не читают документацию*\n"
f"Груша:"
)
# Специальные обычные кнопки
@dp.message(Command("special_buttons"))
async def cmd_special_buttons(message: types.Message):
builder = ReplyKeyboardBuilder()
# метод row позволяет явным образом сформировать ряд
# из одной или нескольких кнопок. Например, первый ряд
# будет состоять из двух кнопок...
builder.row(
types.KeyboardButton(text="Запросить геолокацию", request_location=True),
types.KeyboardButton(
text="Подтвердить контакт",
request_contact=True
)
)
# ... второй из одной ...
builder.row(types.KeyboardButton(
text="Создать викторину",
request_poll=types.KeyboardButtonPollType(type="quiz"))
)
# ... а третий снова из двух
builder.row(
types.KeyboardButton(
text="Выбрать премиум пользователя",
request_user=types.KeyboardButtonRequestUser(
request_id=1,
user_is_premium=False
)
),
types.KeyboardButton(
text="Выбрать супергруппу с форумами",
request_chat=types.KeyboardButtonRequestChat(
request_id=2,
chat_is_channel=True,
chat_is_forum=False
)
)
)
# WebApp-ов пока нет, сорри :(
await message.answer(
"Выберите действие:",
reply_markup=builder.as_markup(resize_keyboard=True),
)
# Прием нажатий нижних двух кнопок
@dp.message(lambda message: message.contact is not None)
async def handle_contact(message: types.Message):
# Проверяем, что контакт принадлежит отправителю
if message.from_user.id == message.contact.user_id:
await message.answer(
f"Спасибо за контакт, {message.contact.first_name}!\n"
f"Номер телефона: {message.contact.phone_number}",
reply_markup=types.ReplyKeyboardRemove() # Убираем клавиатуру
)
else:
await message.answer("Это не ваш контакт!")
@dp.message(F.user_shared)
async def on_user_shared(message: types.Message):
await message.answer(
f"Request {message.user_shared.request_id}. "
f"User ID: {message.user_shared.user_id}"
)
abc_id = message.user_shared.request_id
@dp.message(F.chat_shared)
async def on_user_shared(message: types.Message):
await message.answer(
f"Request {message.chat_shared.request_id}. "
f"Chat ID: {message.chat_shared.chat_id}"
)
# новый импорт
from aiogram.utils.keyboard import InlineKeyboardBuilder
@dp.message(Command("inline_url"))
async def cmd_inline_url(message: types.Message, bot: Bot):
builder = InlineKeyboardBuilder()
builder.row(types.InlineKeyboardButton(
text="GitHub", url="https://github.com")
)
builder.row(types.InlineKeyboardButton(
text="Оф. канал Telegram",
url="tg://resolve?domain=telegram")
)
# Чтобы иметь возможность показать ID-кнопку,
# У юзера должен быть False флаг has_private_forwards
user_id = message.from_user.id
chat_info = await bot.get_chat(user_id)
if not chat_info.has_private_forwards:
builder.row(types.InlineKeyboardButton(
text="Какой-то пользователь",
url=f"tg://user?id={user_id}")
)
await message.answer(
'Выберите ссылку',
reply_markup=builder.as_markup(),
)
@dp.message(Command("vfy"))
@dp.message(CommandStart(
deep_link=True, magic=F.args == "vfy"
))
async def cmd_start_vfy(message: types.Message):
builder = ReplyKeyboardBuilder()
builder.row(
types.KeyboardButton(
text="Подтвердить контакт",
request_contact=True
)
)
# Отправляем сообщение с клавиатурой
await message.answer(
"Для подтверждения номера телефона нажмите кнопку ниже:",
reply_markup=builder.as_markup(
resize_keyboard=True, # Опционально: автоматический размер
one_time_keyboard=True, # Опционально: скрыть после нажатия
input_field_placeholder="Подтвердите номер телефона" # Подсказка в поле ввода
)
)
# Хэндлер для обработки полученного контакта
@dp.message(lambda message: message.contact is not None)
async def handle_contact(message: types.Message):
# Проверяем, что контакт принадлежит отправителю
if message.from_user.id == message.contact.user_id:
await message.answer(
f"Спасибо за контакт, {message.contact.first_name}!\n"
f"Номер телефона: {message.contact.phone_number}",
reply_markup=types.ReplyKeyboardRemove() # Убираем клавиатуру
)
else:
await message.answer("Это не ваш контакт!")
# Продолжим с колбэками
# Здесь хранятся пользовательские данные.
# Т.к. это словарь в памяти, то при перезапуске он очистится
user_data = {}
#сформируем инлайн-клавиатуру
def get_keyboard():
buttons = [
[
types.InlineKeyboardButton(text="-1", callback_data="num_decr"),
types.InlineKeyboardButton(text="+1", callback_data="num_incr")
],
[types.InlineKeyboardButton(text="Подтвердить", callback_data="num_finish")]
]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=buttons)
return keyboard
# формируем сообщение с переменным аргументом
async def update_num_text(message: types.Message, new_value: int):
with suppress(TelegramBadRequest):
await message.edit_text(
f"Укажите число: {new_value}",
reply_markup=get_keyboard()
)
# запуск клавиатуры по команде /numbers
@dp.message(Command("numbers"))
async def cmd_numbers(message: types.Message):
user_data[message.from_user.id] = 0
await message.answer("Укажите число: 0", reply_markup=get_keyboard())
@dp.callback_query(F.data.startswith("num_"))
async def callbacks_num(callback: types.CallbackQuery):
user_value = user_data.get(callback.from_user.id, 0)
action = callback.data.split("_")[1]
if action == "incr":
user_data[callback.from_user.id] = user_value+1
await update_num_text(callback.message, user_value+1)
elif action == "decr":
user_data[callback.from_user.id] = user_value-1
await update_num_text(callback.message, user_value-1)
elif action == "finish":
await callback.message.edit_text(f"Итого: {user_value}")
await callback.answer()
# пока не работает, надо понять как получить file_id
@dp.message(Command("gif"))
async def send_gif(message: Message):
await message.answer_animation(
animation="<file_id>",
caption="Я сегодня:",
show_caption_above_media=True
)
# Извлекаем из сообщений пользователя данные url, email, телефон и код
@dp.message(F.text)
async def extract_data(message: Message):
data = {
"url": "<N/A>",
"email": "<N/A>",
"phone_number": "<N/A>",
# "code": "<N/A>"
}
entities = message.entities or []
for item in entities:
if item.type in data.keys():
# Неправильно
# data[item.type] = message.text[item.offset : item.offset+item.length]
# Правильно
data[item.type] = item.extract_from(message.text)
await message.reply(
"Вот что я нашёл:\n"
f"URL: {html.quote(data['url'])}\n"
f"E-mail: {html.quote(data['email'])}\n"
f"Телефон: {html.quote(data['phone_number'])}\n"
# f"Пароль: {html.quote(data['code'])}"
)
# Запуск процесса поллинга новых апдейтов
async def main():
# Регистрируем хэндлер cmd_test2 по команде /start
dp.message.register(cmd_test2, Command("test2"))
# Запускаем бота
await dp.start_polling(bot)
if __name__ == "__main__":
import asyncio
asyncio.run(main())