...

суббота, 6 марта 2021 г.

«Умная камера» на базе Raspberry Pi с управлением через Telegram-бота

Привет, меня зовут Иван. Сразу отвечу на главный вопрос: почему стал собирать сам, а не взял готовое решение? Во-первых, стоимость готовых решений - Raspberry Pi со всеми датчиками и камерой вышла не больше $30, большая часть еще по курсу 60 рублей за доллар. Во-вторых, почти все части уже были - Raspberry Pi отдал брат, камера осталась еще с лохматых времен, диод тоже был - покупал для Arduino, а датчик движения на Aliexpress стоил не больше 100 рублей.

Повествование в статье будет построено следующим образом:

  1. Определим что нам потребуется;

  2. Поработаем с диодом;

  3. С датчиком движения;

  4. С камерой (фото);

  5. С камерой (видео);

  6. Разберем работу с Telegram-ботом (рассмотрим скелет);

  7. Создадим "Умную камеру";

  8. Посмотрим на работу "Умной камеры";

  9. Определим узкие места и возможные решения.

Итак, поехали

Что нам потребуется

На Raspberry должен быть настроен интернет, установлены:

  • ffmpeg - для записи видео с камеры;

  • Python 3.7.

У Python должны быть следующие библиотеки:

  • RPi.GPIO;

  • pygame;

  • telebot.

И понадобятся провода - F2F, что бы все это соединить.

Работа с диодом

import RPi.GPIO as GPIO
import time

LED_PIN = 3

def setup():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setwarnings(False)
    GPIO.setup(LED_PIN, GPIO.OUT)

def destroy():
    GPIO.output(LED_PIN, GPIO.LOW)
    GPIO.cleanup()

def blink():
    GPIO.output(LED_PIN, GPIO.HIGH)
    time.sleep(1)
    GPIO.output(LED_PIN, GPIO.LOW)

def main():
    setup()
    blink()
    destroy()

if __name__ == '__main__':
    main()

Импортируем библиотеку для работы с нодами Raspberry, определяем нод для диода. В методе настройки библиотеки указываем, что нумерация нодов будет в соответствии с номерами на самой Raspberry, отключаем все предупреждения библиотеки и сообщаем, что на нод диода ток будет выходить, а не считываться. В методе очистки ресурсов отключаем диод и очищаем библиотеку. В главном методе - blink подаем сигнал на нод диода, ждем 1 секунду и выключаем подачу сигнала на нод диода. В методе main вызываем все методы подряд и больше ничего не делаем. Светом диода будем отображать обнаруженение движения датчиком.

Работа с датчиком движения

Перед подключением датчика движения к Raspberry необходимо для начала определить какой провод за что отвечает, потому что на некоторых датчиках земля находится слева, на некоторых - справа и это будет кардинально влиять на работу датчика:

После этого можно подключать датчик и настраивать его физически - скорректировать значение delay и sensitivity для своих нужд, вот пример моей настройки:

Помимо этого, есть еще один параметр настройки у датчика движения, хоть он и не бросается в глаза. На обратной стороне датчика есть "ключ", которым можно настроить дальность работы датчика, для своих нужд я переключил его:

В принципе, работа с датчиком движения сводится к одному методу - методу чтения сигнала с датчика (16 строка).

import RPi.GPIO as GPIO
import time

PIR_PIN = 11

def setup():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setwarnings(False)
    GPIO.setup(PIR_PIN, GPIO.IN)

def destroy():
    GPIO.cleanup()

def sensorJob():
    while True:
        i = GPIO.input(PIR_PIN)
        
        print("PIR value: " + str(i))
        
        time.sleep(0.1)

def main():
    setup()
    sensorJob()
    destroy()

if __name__ == '__main__':
    main()

Определяем нод для датчика движения. В методе настройки библиотеки указываем, что по указанном ноду будет читаться сигнал. В главном методе - sensorJob в бесконечном цикле считываем сигнал, выводим значение в консоль и ожидаем небольшой промежуток времени.

Работа с камерой (фото)

С "железом" разобрались и теперь можно переходить к съемке фото и видео. Подключаем камеру к любому USB-порту, по необходимости, настраиваем. Вероятнее всего, операционная система сама подцепит нужные драйвера и дополнительной настройки не потребуется. Если у вас подключена 1 камера она будет называться /dev/video0. Также важно определить разрешение камеры, потому что можно получить фотографии сплошным черным цветом.

from datetime import datetime
import pygame
import pygame.camera

pygame.init()
pygame.camera.init()
pygame.camera.list_cameras() 

cam = pygame.camera.Camera("/dev/video0", (640,426))

def saveCapture():
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
    
    cam.start()
    pygame.image.save(cam.get_image(), filename)
    cam.stop()

def main():
    setup()
    saveCapture()
    destroy()

if __name__ == '__main__':
    main()

Импортируем библиотеку для работы с камерой. Инициализируем библиотеку, инициализируем расширение для работы с камерой, выводим список доступных камер, сохраняем нужную нам камеру в переменную, во втором параметре указываем разрешение камеры. В методе saveCapture определяем название файла, вызываем метод start у камеры, через метод get_image() у камеры получаем объект со снимком, сохраняем его методом pygame.image.save, в завершение останавливаем работу камеры методом stop. Очень важно останавливать работу камеры через библиотеку pygame, потому что библиотека ffmpeg будет пытаться использовать камеру и в случае, если мы ее не освободим будет завершаться ошибкой.

Работа с камерой (видео)

Для съемки видео мы будем отправлять в терминал сообщение следующего вида:

ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy <filename>

Где будем указывать формат видео - v4l2, количетсво кадров, разрешение съемки, главный аргумент -c copy, если у вас медленная камера, с этим параметром съемка и сохранение файла будет происходить в десятки раз быстрее, но будет отсутствовать preview у видео и в Telegram не будет отображаться количество времени, которое длится видео. Также, нельзя будет просмотреть видео прямо из Telegram, потребуется отдельный плеер.

Каркас Telegram-бота

Перед рассмотрением работы с Telegram-ботом предполагаем, что вы уже получили ключ от BotFather и выполнили настройку бота на стороне Telegram, там нет ничего сложного, поэтому не будем на этом останавливаться. Также, мы будем работать сразу с InlineKeyboard у Telegram-бота, потому что это удобно и не нужно переживать о том, как правильно писать ту или иную команду. Кроме того, в самом начале будем делать проверку на ID пользователя, чтобы никто случайно (или специально) не подглядывал в нашу камеру.

from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot

TOKEN = '99999999999:xxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999

bot = telebot.TeleBot(TOKEN)

@bot.message_handler(commands=['start'])
def start(message):    
    telegram_user = message.from_user
    
    if telegram_user.id != ADMIN_USER_ID:
        bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
        
        return
           
    keyboard = [
        [
            InlineKeyboardButton("Send capture", callback_data='sendCapture'),
            InlineKeyboardButton("Send video", callback_data='sendVideo')
        ],
    ]

    bot.send_message(chat_id=message.chat.id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def sendCapture(chat_id):
    bot.send_photo(chat_id, photo=open('<filename>', 'rb'))

def sendVideo(chat_id):
    bot.send_video(chat_id, open('<filename>', 'rb'))

@bot.callback_query_handler(func=lambda call: True)
def button(call):
    globals()[call.data](call.message.chat.id)

def main():
    setup()
    bot.polling(none_stop=False, interval=5, timeout=20)
    destroy()

if __name__ == '__main__':
    main()

Импортируем классы для InlineKeyboard, импортируем библиотеку Telegram-бота. В переменной TOKEN указываем ключ от BotFather. Сохраняем ADMIN ID. С помощью нотаций указываем команду для метода start. В методе start делаем проверку на USER ID и если это не мы, то выводим сообщение и выходим из метода, если же это мы, то отправляем InlineKeyboard с доступными командами. При отправке клавиатуры можно также сразу отправлять и текст. В методах sendCapture и sendVideo показаны примеры команд для отправки фото и видео через Telegram-бота. Далее нам нужно указать метод, который будет обрабатывать нажатия на InlineKeyboard, тоже через нотацию, но теперь нотация немного другая, вызов метода по нажатию на кнопку делаем по имени этого метода. В методе main вызываем bot.pooling, который в бесконечном цикле проверяет наличие входящих сообщений боту. Если у вас быстрая Raspberry и интернет, то можете не указывать параметры у этого метода и оставить их по-умолчанию. У меня стабильность отправки сообщений была низкой из-за чего пришлось добавить interval = 5. В конце статьи расскажу на что это влияет.

"Умная камера"

Подключаем диод и датчик движения к нужным GPIO нодам, камеру к USB.

Нумерация нодов

У меня это выглядит вот так:

Теперь соберем все куски логики, описанные ранее и получим "Умную камеру" с управлением через Telegram-бота.

from datetime import datetime
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot
import logging
import pygame
import pygame.camera
import RPi.GPIO as GPIO
import threading
import time
import os

TOKEN = '99999999999:xxxxxxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999999
LED_PIN = 3
PIR_PIN = 11
VIDEO_FILE_FORMAT = '.mkv'

# Enable Logging
logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

isSensorEnabled = False
isMuteNotifications = False

last_chat_id = -1
keyboard = []

pygame.init()
pygame.camera.init()
pygame.camera.list_cameras() 

cam = pygame.camera.Camera("/dev/video0", (640,426))
bot = telebot.TeleBot(TOKEN)

def setup():
    GPIO.setmode(GPIO.BOARD) 
    GPIO.setwarnings(False)
    GPIO.setup(LED_PIN, GPIO.OUT)
    GPIO.setup(PIR_PIN, GPIO.IN)

def destroy():
    GPIO.output(LED_PIN, GPIO.LOW)
    GPIO.cleanup() 

def log_params(method_name, message):
    logger.debug("Method: %s\nFrom: %s\nchat_id: %d\nText: %s" %
                (method_name,
                 message.from_user,
                 message.chat.id,
                 message.text))

@bot.message_handler(commands=['start'])
def start(message):
    global keyboard
    
    log_params('start', message)
    
    telegram_user = message.from_user
    
    if telegram_user.id != ADMIN_USER_ID:
        bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
        
        return
           
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=message.chat.id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def sendCapture(chat_id):
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
    
    if (os.path.exists(filename)):
        bot.send_photo(chat_id, photo=open(filename, 'rb'))
    else:
        cam.start()
        pygame.image.save(cam.get_image(), filename)
        cam.stop()

        bot.send_photo(chat_id, photo=open(filename, 'rb'))

def get_capture(chat_id):
    sendCapture(chat_id)
    bot.send_message(chat_id=chat_id,
                     text="Supported commands:",
                     reply_markup=InlineKeyboardMarkup(keyboard))

def sendVideo(chat_id):
    filename = sorted(list(filter(lambda x: x.endswith(VIDEO_FILE_FORMAT), os.listdir())))[-1]

    bot.send_video(chat_id, open(filename, 'rb'))

def captureVideo():
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + VIDEO_FILE_FORMAT
    
    os.system("ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy \"" + filename + "\"")
    
    return filename

def get_video(chat_id):
    bot.send_message(chat_id=chat_id,
                     text="Capturing video..")
        
    filename = captureVideo()

    bot.send_message(chat_id=chat_id,
                     text="Sending video..")
    bot.send_video(chat_id, open(filename, 'rb'))
    bot.send_message(chat_id=chat_id,
                     text="Supported commands:",
                     reply_markup=InlineKeyboardMarkup(keyboard))

def sensorJob():
    global isSensorEnabled
    global keyboard
    
    isRecording = False
    
    while isSensorEnabled:
        i = GPIO.input(PIR_PIN)
        
        GPIO.output(LED_PIN, i)
        
        if (i == 1 and not isRecording):
            isRecording = True
            
            if (not isMuteNotifications):
                sendCapture(last_chat_id)
        
        if (isRecording):
            captureVideo()
        
        if (i == 0 and isRecording):
            if (not isMuteNotifications):
                sendVideo(last_chat_id)
            
            isRecording = False
        
        time.sleep(0.1)
        
    if (isRecording):
        sendVideo(last_chat_id)
    
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=last_chat_id,
                     text="Sensor stopped")
    bot.send_message(chat_id=last_chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def start_sensor(chat_id):
    global keyboard
    global isSensorEnabled
    global last_chat_id
    
    last_chat_id = chat_id
    isSensorEnabled = True
    
    threading.Thread(target=sensorJob).start()
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Sensor started")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))
    
def stop_sensor(chat_id):
    global keyboard
    global last_chat_id
    
    last_chat_id = -1
    isSensorEnabled = False
    
    GPIO.output(LED_PIN, GPIO.LOW)
    
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=chat_id,
                         text="Sensor stop requested")

def mute_notifications(chat_id):
    global keyboard
    
    isMuteNotifications = True
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Unmute notifications", callback_data='unmute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Notifications muted")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def unmute_notifications(chat_id):
    global keyboard
    
    isMuteNotifications = False
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Notifications unmuted")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

@bot.callback_query_handler(func=lambda call: True)
def button(call):
    globals()[call.data](call.message.chat.id)

def main():
    setup()
    bot.polling(none_stop=False, interval=5, timeout=20)
    destroy()

if __name__ == '__main__':
    main()

Работу сенсора будем запускать в отдельном потоке, поэтому импортируем необходимую библиотеку. Создаем переменную для хранения расширения видео. Логирование из библиотеки logging не влияет на работу "Умной камеры", поэтому не будем заострять на этом внимание. Создаем переменные для флагов о работе сенсора и необходимости отправки уведомлений. Создаем переменную для хранения последнего chat_id, зачем это - расскажу позже. Переменная для хранения клавиатуры нужна, чтобы можно было повторно отправлять нужную клавиатуру. Дальше идет знакомый нам код по инициализации библиотеки pygame и Telegram-бота. Методы setup и destroy нам уже знакомы, метод логирования можно проигнорировать. В методе start проверяем user_id, если пользователь - мы, то отправляем актуальную клавиатуру. В методе sendCapture определяем filename, создаем изображение с камеры и отправляем по chat_id. Метод get_capture нужен для отправки изображения и клавиатуры с актуальными коммандами. Метод sendVideo отвечает за отправку последнего записанного видео. В методе captureVideo определяем filename и в систему отправляем запрос на запись видео с помощью утилиты ffmpeg. Метод get_video служит для отправки видео с отправкой промежуточных статусов.

Дальше рассмотрим ключевой метод sensorJob: здесь в бесконечном цикле считываем сигнал с датчика движения, выводим его на диод и при наличии сигнала сразу отправляем фото и включаем запись видео. Когда работа цикла завершается - отправляем сообщение об отключении датчика движения и актуальную клавиатуру. Так как метод sensorJob работает из потока - здесь нам и понадобится переменная last_chat_id. В методе start_sensor запоминаем сhat_id, устанавливаем флаг работы датчика движения, запускаем поток с методом sensorJob и отправляем статус и актуальную клавиатуру. В методе stop_sensor чистим last_chat_id, устанавливаем флаг работы датчика движения в ложное состояние, отключаем диод и отправляем актуальную клавиатуру. В методах mute/unmute_notifications переключаем в соответствующее положение флаг и отправляем статус и актуальную клавиатуру. На этом логика "Умной камеры" на текущий момент заканчивается. Теперь посмотрим на "Умную камеру" в действии.

Работа "Умной камеры"

Обратите внимание на время начала записи и получения фото и видео - оно очень большое для реальных кейсов, больше 5 минут. Это связано с параметрами в bot.pooling в методе main, а именно с параметром interval. Если оставить его по-умолчанию, то задержка будет минимальна и, как минимум, фото будет приходить практически моментально, но тогда возможны сбои из-за "Connection timeout" в работе Telegram-клиента и надо как-то обрабатывать это дополнительно.

Узкие места и возможные решения

  1. Необходима быстрая флешка, чтобы успевать сохранять видео;

  2. Желательно запись видео инициировать из Python напрямую, например, через библиотеку pyffmpeg, чтобы уменьшить время на инициализацию библиотеки при каждой записи видео;

  3. Хорошая камера, чтобы можно было рязглядеть нарушителя;

  4. Наличие быстрого интернета для более быстрого получения актуального видео.

Let's block ads! (Why?)

Комментариев нет:

Отправить комментарий