Привет, мир! Меня зовут Павел, я IT инженер и руководитель службы технической поддержки.
Работая в формате крупного IT-аутсорсинга, мы в компании столкнулись с проблемой: использование общего WhatsApp/Telegram Web, подключённого на компьютерах сотрудников поддержки, оказалось неэффективным. Такой подход не позволял контролировать качество диалогов, а также затруднял перевод обращений клиентов в структурированные тикеты, вследствие чего была начата разработка коннектора к нашему корпоративному порталу Bitrix24.
В данной статье я бы хотел поделиться основами технической реализации и ключевым функционалом коннектора. Детали реализации - например, верстку html страниц, тесты, кастомизацию панели администратора и так далее - оставляю на ваше усмотрение.
Технологический стек
Основу проекта составляют Python и Django, что обусловлено их простотой, гибкостью и широким набором готовых решений. Такой выбор дал возможность быстро реализовать минимально жизнеспособный продукт (MVP) и заложить фундамент для дальнейшего масштабирования.
Также вам понадобится установить redis, celery и настроить их.
Основной функционал Bitrix
Опустим установку Django и подготовку виртуального окружения и приступим к реализации интеграции с Битриксом. Для этого создадим Django приложение bitrix. Начнем с создания моделей в models.py:
import uuid
from django.conf import settings
from django.contrib.sites.models import Site
from django.db import models
Модель для хранения данных о портале:
class Bitrix(models.Model):
PROTOCOL_CHOICES = [
('http', 'HTTP'),
('https', 'HTTPS'),
]
protocol = models.CharField(max_length=5, choices=PROTOCOL_CHOICES, default='https')
domain = models.CharField(max_length=255)
owner = models.ForeignKey(
User, on_delete=models.SET_NULL, blank=True, null=True
)
user_id = models.CharField(max_length=255, blank=True, null=True)
member_id = models.CharField(max_length=255, unique=True, blank=True, null=True)
license_expired = models.BooleanField(default=False)
def __str__(self):
return self.domain
Модель коннектора:
class Connector(models.Model):
TYPE_CHOICES = [
('telegram', 'Telegram Bot'),
('waweb', 'WhatsApp Web'),
]
code = models.CharField(max_length=255, default=uuid.uuid4(), unique=True)
service = models.CharField(max_length=255, choices=TYPE_CHOICES, blank=True, null=True)
name = models.CharField(max_length=255, default="itsource.kg", unique=False)
icon = models.FileField(upload_to='connector_icons/', blank=True, null=True,
default='connector_icons/cloud-rain-alt.svg') # Заменить на вашу svg иконку
def __str__(self):
return self.name
Конфигурация приложения Битрикс:
class App(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True
)
site = models.ForeignKey(
Site, on_delete=models.SET_NULL, related_name="apps", blank=True, null=True
)
name = models.CharField(max_length=255, blank=True, unique=False)
page_url = models.CharField(max_length=255, blank=True, default="/")
connectors = models.ManyToManyField(Connector, blank=True, related_name='apps')
asterisk = models.BooleanField(default=False, help_text="Chek for Asterisk connector")
client_id = models.CharField(max_length=255, blank=True, unique=False)
client_secret = models.CharField(max_length=255, blank=True)
def __str__(self):
return self.name
Модель для хранения сущностей локальных приложений и открытых линий:
class AppInstance(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True
)
app = models.ForeignKey(App, on_delete=models.SET_NULL, related_name="installations", blank=True, null=True)
portal = models.ForeignKey(
Bitrix, on_delete=models.CASCADE, related_name="installations", blank=True, null=True
)
auth_status = models.CharField(max_length=1)
application_token = models.CharField(max_length=255, blank=True)
storage_id = models.CharField(max_length=255, blank=True)
status = models.IntegerField(default=0, blank=True)
attempts = models.IntegerField(default=0, blank=True)
access_token = models.CharField(max_length=255, blank=True, null=True, editable=False)
refresh_token = models.CharField(max_length=255, blank=True, null=True, editable=False)
def __str__(self):
return f"{self.app.name} on {self.portal.domain}"
class Line(models.Model):
line_id = models.CharField(max_length=50)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True
)
app_instance = models.ForeignKey(AppInstance, on_delete=models.CASCADE, related_name="lines", null=True)
connector = models.ForeignKey(Connector, on_delete=models.SET_NULL, related_name="lines", null=True)
portal = models.ForeignKey(Bitrix, on_delete=models.CASCADE, related_name="lines", blank=True, null=True)
def __str__(self):
return f"Line {self.line_id}"
Я приверженец модульной архитектуры, поэтому создадим отдельный модуль api внутри приложения bitrix для реализации api вебхуков
api/serializers.py
from rest_framework import serializers
from bitrix.models import Bitrix
class PortalSerializer(serializers.ModelSerializer):
class Meta:
model = Bitrix
fields = [
"owner",
"user_id",
"domain",
]
def create(self, validated_data):
return Bitrix.objects.create(**validated_data)
api/views.py
from rest_framework.mixins import CreateModelMixin
from rest_framework.renderers import JSONRenderer
from rest_framework.viewsets import GenericViewSet
from rest_framework.response import Response
from bitrix.models import Bitrix
from bitrix.utils import event_processor
from .serializers import PortalSerializer
class PortalViewSet(CreateModelMixin, GenericViewSet):
queryset = Bitrix.objects.all()
serializer_class = PortalSerializer
def create(self, request, *args, **kwargs):
print("create func")
return event_processor(request)
def head(self, request, *args, **kwargs):
print("head func")
return Response(headers={'Allow': 'POST, HEAD'})
В settings.py в приложении с ядром проекта включим авторизацию по токенам
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": (
"rest_framework.authentication.SessionAuthentication",
"rest_framework.authentication.TokenAuthentication",
"core.qpta.QueryParamTokenAuthentication",
),
"DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),
'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer',)
}
Там же создадим qpta.py:
from rest_framework.authentication import TokenAuthentication
class QueryParamTokenAuthentication(TokenAuthentication):
def authenticate(self, request):
# Try to get the token from the URL query parameter
token = request.query_params.get("api-key")
if not token:
# Fall back to default token authentication
return super().authenticate(request)
# Authenticate the token manually
user, token = self.authenticate_credentials(token)
return (user, token)
И api_router.py:
from django.conf import settings
from rest_framework.routers import DefaultRouter
from rest_framework.routers import SimpleRouter
from bitrix.api.views import PortalViewSet
from users.api.views import UserViewSet
from waweb.api.views import EventsHandler
from telegram.api.views import TelegramEventsHandler
router = DefaultRouter() if settings.DEBUG else SimpleRouter()
router.register("users", UserViewSet)
router.register("bitrix", PortalViewSet)
router.register("waweb", EventsHandler, basename="waevents")
router.register("telegram", TelegramEventsHandler, basename="tgevents")
app_name = "api"
urlpatterns = router.urls
В urls.py добавим:
urlpatterns += [
path("api/", include("core.api_router")), # core заменить на название приложения где лежит ядро проекта
]
Создадим файл utils.py с основными функциями:
import base64
import json
import logging
import re
import redis
import requests
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.contrib import messages
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404
from rest_framework import status
from rest_framework.authtoken.models import Token
from rest_framework.response import Response
from django.http import HttpResponse
from waweb.models import Session
import waweb.utils as waweb
import waweb.tasks as waweb_tasks
import telegram.tasks as telegram_tasks
from .crest import call_method
from .models import App, AppInstance, Bitrix, Line, Connector
import bitrix.tasks as bitrix_tasks
from telegram.models import TelegramBot
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
logger = logging.getLogger("django")
GENERAL_EVENTS = [
"ONAPPUNINSTALL",
]
CONNECTOR_EVENTS = [
"ONIMCONNECTORMESSAGEADD",
"ONIMCONNECTORLINEDELETE",
"ONIMCONNECTORSTATUSDELETE",
]
Функция для подключения открытой линии разделяется на 2 логических блока: создание и подключение уже существующей линии. Если id линии начинается с create, то создаем новую открытую линию и AppInstance. Далее отправляем в Битрикс конфиги открытой линии (imopenlines.config.add), проверяем AppInstance на наличие подключенной открытой линии и деактивируем ее, если таковая имеется. Финальным шагом активируем нашу открытую линию в Битриксе методом imconnector.activate и возвращаем редирект на страницу завершения подключения:
def connect_line(request, line_id, entity, connector, redirect_to):
line_id = str(line_id)
if line_id.startswith("create__"):
instance_id = line_id.split("__")[1]
app_instance = get_object_or_404(AppInstance, id=instance_id, owner=request.user)
if not app_instance.portal:
messages.error(request, "Невозможно создать линию: портал не найден")
return redirect(redirect_to)
if entity.line:
call_method(app_instance, "imconnector.activate", {
"CONNECTOR": connector.code,
"LINE": entity.line.line_id,
"ACTIVE": 0,
})
if connector.service == "telegram":
line_name = entity.bot_username
else:
line_name = entity.phone
create_payload = {"PARAMS": {"LINE_NAME": line_name}}
result = call_method(app_instance, "imopenlines.config.add", create_payload)
if result and result.get("result"):
new_line_id = result["result"]
line = Line.objects.create(
line_id=new_line_id,
portal=app_instance.portal,
connector=connector,
app_instance=app_instance,
owner=request.user
)
entity.line = line
entity.app_instance = app_instance
entity.save()
activate_payload = {
"CONNECTOR": connector.code,
"LINE": new_line_id,
"ACTIVE": 1,
}
call_method(app_instance, "imconnector.activate", activate_payload)
messages.success(request, f"Создана и подключена линия {new_line_id}")
else:
messages.error(request, f"Ошибка при создании линии: {result}")
return redirect(redirect_to)
else:
line = get_object_or_404(Line, id=line_id)
if not line:
messages.error(request, f"Линия {line_id} не найдена")
return redirect(redirect_to)
entity_model = type(entity)
usage_count = entity_model.objects.filter(line=line).exclude(pk=entity.pk).count()
if usage_count > 0:
messages.error(request, "Эта линия уже используется.")
return redirect(redirect_to)
app_instance = line.app_instance
if entity.line == line:
messages.success(request, "Выбрана та же линия")
return redirect(redirect_to)
if entity.line:
call_method(app_instance, "imconnector.activate", {
"CONNECTOR": connector.code,
"LINE": entity.line.line_id,
"ACTIVE": 0,
})
response = call_method(app_instance, "imconnector.activate", {
"CONNECTOR": connector.code,
"LINE": line.line_id,
"ACTIVE": 1,
})
if response.get("result"):
entity.line = line
entity.app_instance = app_instance
entity.save()
messages.success(request, "Линия подключена")
messages.success(request, "Настройки обновлены")
return redirect(redirect_to)
Подписка на события Битрикс представляет из себя всего 1 запрос на эндпоинт event.bind, в котором мы сообщаем адрес нашего вебхука, принимающего события:
def events_bind(events: dict, appinstance: AppInstance, api_key: str):
url = appinstance.app.site
for event in events:
payload = {
"event": event,
"HANDLER": f"https://{url}/api/bitrix/?api-key={api_key}",
}
try:
return call_method(appinstance, "event.bind", payload)
except (ObjectDoesNotExist, Exception) as exc:
print(exc, " in events_bind")
return None
Регистрация коннектора включает в себя 2 функции: регистрация коннектора (register_connector) и обработка его установки (process_placement).
Функция регистрации коннектора добавляет его в меню битрикса CRM - Контакт-Центр. Здесь мы декодируем нашу svg иконку в base64 и вызываем метод imconnector.register с такими параметрами, как uuid коннектора, название коннектора и его иконка в base64. После вызова этой функции, карточка коннектора появится в Контакт-Центре.
Что касается обработчика установки, его следует воспринимать как входящий вебхук от битрикса. Здесь мы получаем параметры коннектора и открытой линии, uuid приложения и адрес портала Битрикс. Если данные корректны, привязываем Открытую Линию к сущности локального приложения коннектора и пользователю и передаем HANDLER для событий, связанных с сообщениями:
def register_connector(appinstance: AppInstance, api_key: str, connector):
url = appinstance.app.site
if not connector.icon:
return None
try:
with open(connector.icon.path, "rb") as file:
image_data = file.read()
encoded_image = base64.b64encode(image_data).decode("utf-8")
connector_logo = f"data:image/svg+xml;base64,{encoded_image}"
payload = {
"ID": connector.code,
"NAME": connector.name,
"ICON": {
"DATA_IMAGE": connector_logo,
},
"PLACEMENT_HANDLER": f"https://{url}/app-settings/?inst={appinstance.id}",
}
try:
return call_method(appinstance, "imconnector.register", payload)
except (ObjectDoesNotExist, Exception) as exc:
print(exc, " in register_connector")
return None
events_bind(CONNECTOR_EVENTS, appinstance, api_key)
except FileNotFoundError:
return None
except Exception as e:
return None
def process_placement(request):
try:
data = request.POST
placement_options = data.get("PLACEMENT_OPTIONS")
instance_id = request.GET.get("inst")
domain = request.GET.get("DOMAIN")
print("process placement GET ", request.GET)
print("FULL URL:", request.build_absolute_uri())
placement_options = json.loads(placement_options)
line_id = placement_options.get("LINE")
connector_code = placement_options.get("CONNECTOR")
app_instance = AppInstance.objects.filter(id=instance_id).first()
if not app_instance:
return HttpResponse("app not found")
portal = Bitrix.objects.filter(domain=domain).first()
if not portal:
return HttpResponse("bitrix not found")
connector = Connector.objects.filter(code=connector_code).first()
if not connector:
return HttpResponse("connector not found")
line, created = Line.objects.get_or_create(
line_id=line_id,
portal=portal,
connector=connector,
app_instance=app_instance,
owner=app_instance.owner
)
activate_payload = {
"CONNECTOR": connector.code,
"LINE": line_id,
"ACTIVE": 1,
}
print("activate_payload= ", activate_payload)
activate_result = call_method(app_instance, "imconnector.activate", activate_payload)
if not activate_result or not activate_result.get("result"):
return HttpResponse(f"Failed to activate connector: {activate_result}")
# api_key = request.GET.get("api-key")
api_key = Token.objects.get(user=app_instance.owner).key
print("request data= ", request.GET)
print("api_key=", api_key)
print("app_instance=", app_instance)
events = ["ONIMCONNECTORMESSAGEADD", "ONIMCONNECTORSTATUSDELETE", "ONIMCONNECTORLINEDELETE"]
for event in events:
event_payload = {
"event": event,
"HANDLER": f"https://{app_instance.app.site}/api/bitrix/?api-key={api_key}",
}
call_method(app_instance, "event.bind", event_payload)
return HttpResponse(
f"Линия успешно создана и активирована. Настройте доп. параметры в Bitrix: https://{app_instance.app.site}/portals/"
)
except Exception as e:
logger.error(f"Unexpected error: {e}")
return HttpResponse({"An unexpected error occurred"})
Обработка файлов и событий. extract_files и upload_file описывают логику работы коннектора с файлами. Ничего сверхъестественного, поэтому предлагаю не задерживаться на этом и перейдем к event_processor - функции, обрабатывающей все входящие события от Битрикса.
def extract_files(data):
files = []
i = 0
while True:
name_key = f"data[MESSAGES][0][message][files][{i}][name]"
link_key = f"data[MESSAGES][0][message][files][{i}][link]"
type_key = f"data[MESSAGES][0][message][files][{i}][type]"
if name_key in data and link_key in data:
files.append(
{
"name": data.get(name_key),
"link": data.get(link_key),
"type": data.get(type_key),
},
)
i += 1
else:
break
return files
def upload_file(appinstance, storage_id, fileContent, filename):
payload = {
"id": storage_id,
"fileContent": fileContent,
"data": {"NAME": filename},
"generateUniqueName": True,
}
upload_to_bitrix = call_method(appinstance, "disk.folder.uploadfile", payload)
if "result" in upload_to_bitrix:
return upload_to_bitrix["result"]
else:
return None
event_processor - это единый обработчик всех событий, поступающих от Bitrix24 по REST-хукам. Он принимает входящие запросы, извлекает из них данные (события, токены, параметры портала) и в зависимости от типа события выполняет соответствующую бизнес-логику:
регистрация приложения и портала при установке,
обработка новых сообщений от пользователей,
отключение или удаление линии,
корректное завершение работы при удалении приложения.
Рассмотрим каждое событие более детально.
Обработка установки приложения (ONAPPINSTALL)
Когда пользователь впервые устанавливает приложение в свой портал Bitrix24, прилетает событие ONAPPINSTALL.
Здесь функция:
Проверяет, существует ли экземпляр приложения (AppInstance).
Если нет — создаёт новый портал (Bitrix) и привязывает к нему приложение.
Сохраняет выданные Bitrix24 токены (access_token, refresh_token, application_token).
Регистрирует события (event.bind) и подключает доступные коннекторы (например, WhatsApp или Telegram).
При необходимости создаёт хранилище в Bitrix Диске (disk.storage.getforapp).
Возвращает сообщение об успешной установке.
Таким образом, уже на этапе установки приложение готово к полноценной работе.
Обработка входящих сообщений (ONIMCONNECTORMESSAGEADD)
Одно из самых частых событий — это новое сообщение от клиента, пришедшее в открытую линию.
Алгоритм работы:
Определяется, какой коннектор сработал (WhatsApp Web или Telegram).
Извлекаются данные о линии (LINE), сообщении (MESSAGE_ID, CHAT_ID) и его содержимом.
Bitrix получает подтверждение о доставке через метод imconnector.send.status.delivery.
Система проверяет, не было ли это сообщение отправлено самой интеграцией (защита от "петли" через Redis).
Если есть вложения — они извлекаются.
-
В зависимости от коннектора сообщение пробрасывается в WhatsApp или Telegram:
для WhatsApp — через waweb-сессию,
для Telegram — через соответствующего бота.
Если отправка успешна — сообщение сохраняется в базе.
Таким образом, коннектор становится полноценным «мостом» между Bitrix24 и внешним мессенджером.
Отключение линии (ONIMCONNECTORSTATUSDELETE)
Когда в Bitrix24 отключается линия, событие ONIMCONNECTORSTATUSDELETE уведомляет приложение о том, что коннектор больше неактивен.
Здесь происходит:
поиск линии по ID,
отвязка связанного ресурса (например, телефона в WhatsApp),
возврат ответа о том, что линия корректно отключена.
Удаление линии (ONIMCONNECTORLINEDELETE)
Если линия полностью удаляется в Bitrix24, приложение должно синхронизировать изменения.
Обработчик:
находит линию в базе,
удаляет её,
возвращает статус операции.
Удаление приложения (ONAPPUNINSTALL)
При удалении приложения из Bitrix24 (ONAPPUNINSTALL):
Удаляется AppInstance (экземпляр приложения для конкретного портала).
Если у портала больше нет ни одного экземпляра приложения — опционально удаляется и сам портал.
def event_processor(request):
try:
data = request.data
event = data.get("event")
domain = data.get("auth[domain]")
user_id = data.get("auth[user_id]")
auth_status = data.get("auth[status]")
access_token = data.get("auth[access_token]")
refresh_token = data.get("auth[refresh_token]")
application_token = data.get("auth[application_token]")
member_id = data.get("auth[member_id]")
api_key = request.query_params.get("api-key")
app_id = request.query_params.get("app-id")
print(data)
try:
if not api_key:
raise ValueError("API key is required")
appinstance = AppInstance.objects.get(application_token=application_token)
appinstance.access_token = access_token
appinstance.refresh_token = refresh_token
appinstance.save()
if not appinstance.portal.member_id:
appinstance.portal.member_id = member_id
appinstance.portal.save()
except AppInstance.DoesNotExist:
if event == "ONAPPINSTALL":
try:
app = App.objects.get(id=app_id)
except App.DoesNotExist:
return Response({"message": "App not found."})
portal, created = Bitrix.objects.get_or_create(
member_id=member_id,
defaults={
"domain": domain,
"user_id": user_id,
"owner": request.user if auth_status == "L" else None,
}
)
appinstance_owner = (
portal.owner
if portal.owner
else (request.user if auth_status == "L" else None)
)
appinstance, created = AppInstance.objects.update_or_create(
app=app,
portal=portal,
owner=appinstance_owner,
defaults={
"auth_status": auth_status,
"access_token": access_token,
"refresh_token": refresh_token,
"application_token": application_token,
}
)
storage_data = call_method(appinstance, "disk.storage.getforapp", {})
if "result" in storage_data:
storage_id = storage_data["result"]["ID"]
appinstance.storage_id = storage_id
appinstance.save()
# Регистрация коннектора/ подписка на события
def register_events_and_connectors():
events_bind(GENERAL_EVENTS, appinstance, api_key)
if app.connectors.exists():
for connector in app.connectors.all():
register_connector(appinstance, api_key, connector)
transaction.on_commit(register_events_and_connectors)
if VENDOR_BITRIX_INSTANCE:
bitrix_tasks.create_deal(appinstance.id, VENDOR_BITRIX_INSTANCE, app.name)
if portal.owner:
return Response('App successfully created and linked')
return Response(
{"message": "App and portal successfully created and linked."},
status=status.HTTP_201_CREATED,
)
else:
return Response({"message": "App not found and not an install event."})
# Обработка события ONIMCONNECTORMESSAGEADD
if event == "ONIMCONNECTORMESSAGEADD":
connector_code = data.get("data[CONNECTOR]")
connector = get_object_or_404(Connector, code=connector_code)
if not connector:
return Response({'Connector not found'})
line_id = data.get("data[LINE]")
message_id = data.get("data[MESSAGES][0][im][message_id]")
chat_id = data.get("data[MESSAGES][0][im][chat_id]")
chat = data.get("data[MESSAGES][0][chat][id]")
status_data = {
"CONNECTOR": connector_code,
"LINE": line_id,
"MESSAGES": [
{
"im": {
"chat_id": chat_id,
"message_id": message_id,
},
},
],
}
call_method(appinstance, "imconnector.send.status.delivery", status_data)
# Проверяем наличие сообщения в редис (отправлено из других сервисов )
if redis_client.exists(f'bitrix:{domain}:{message_id}'):
return Response({'message': 'loop message'})
file_type = data.get("data[MESSAGES][0][message][files][0][type]", None)
text = data.get("data[MESSAGES][0][message][text]", None)
if text:
text = re.sub(r"\[(?!(br|\n))[^\]]+\]", "", text)
text = text.replace("[br]", "\n")
files = []
print(file_type)
if file_type:
files = extract_files(data)
if connector.service == "waweb":
try:
line = Line.objects.get(line_id=line_id, app_instance=appinstance)
wa = Session.objects.get(line=line)
if files:
for file in files:
waweb_tasks.send_message_task(str(wa.session), [chat], file, 'media')
resp = waweb.send_message(wa.session, chat, text)
if resp.status_code == 201:
waweb.store_msg(resp)
except Exception as e:
print(f'Failed to send waweb message: {str(e)}')
return Response({'error': f'Failed to send message: {str(e)}'})
if connector.service == "telegram":
try:
# Получаем линию
line = Line.objects.get(line_id=line_id, app_instance=appinstance)
# Получаем бота
bot = TelegramBot.objects.get(line=line)
# Сначала отправляем медиафайлы, если есть
if files:
for file in files:
telegram_tasks.send_telegram_message_task(
bot_token=bot.bot_token,
recipient=chat,
content=file,
cont_type="media"
)
# Отправляем текстовое сообщение
if text:
telegram_tasks.send_telegram_message_task(
bot_token=bot.bot_token,
recipient=chat,
content=text,
cont_type="string"
)
except Exception as e:
print(f'Failed to send Telegram message: {str(e)}')
return Response({'error': f'Failed to send message: {str(e)}'})
return Response(
{"status": "ONIMCONNECTORMESSAGEADD event processed"},
status=status.HTTP_200_OK,
)
elif event == "ONIMCONNECTORSTATUSDELETE":
line_id = data.get("data[line]")
connector_code = data.get("data[connector]")
connector = get_object_or_404(Connector, code=connector_code)
if not connector:
return Response({'Connector not found'})
try:
line = Line.objects.get(line_id=line_id, app_instance=appinstance)
if connector.service == "waweb":
phone = line.wawebs.first()
if phone:
phone.line = None
phone.save()
return Response("Line disconnected")
except Line.DoesNotExist:
return Response(
{"status": "Line not found"},
status=status.HTTP_200_OK,
)
elif event == "ONIMCONNECTORLINEDELETE":
line_id = data.get("data")
try:
line = Line.objects.filter(line_id=line_id, app_instance=appinstance).first()
if line:
line.delete()
return Response({"status": "Line deleted"}, status=status.HTTP_200_OK)
except Line.DoesNotExist:
return Response(
{"status": "Line not found"}, status=status.HTTP_200_OK
)
elif event == "ONAPPUNINSTALL":
portal = appinstance.portal
appinstance.delete()
if not AppInstance.objects.filter(portal=portal).exists():
# portal.delete()
return Response(f"{appinstance} and associated portal deleted")
else:
return Response(f"{appinstance} deleted")
else:
return Response('Unsupported event')
except Exception as e:
logger.error(f"Error occurred: {e!s}")
return Response(
{"error": "Internal server error"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
Для асинхронных запросов создадим tasks.py:
import redis
from celery import shared_task
import logging
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from .crest import call_method
from .models import AppInstance
logger = logging.getLogger("django")
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=6379, db=0)
FROM_MARKET_FIELD = settings.FROM_MARKET_FIELD
@shared_task(bind=True, max_retries=5, default_retry_delay=5)
def call_api(self, id, method, payload):
try:
appinstance = AppInstance.objects.get(id=id)
return call_method(appinstance, method, payload)
except (ObjectDoesNotExist, Exception) as exc:
raise self.retry(exc=exc)
@shared_task
def get_app_info():
app_instances = AppInstance.objects.all()
for app_instance in app_instances:
if app_instance.attempts < settings.BITRIX_CHECK_APP_ATTEMTS:
call_api(app_instance.id, "app.info", {})
@shared_task(bind=True, max_retries=5, default_retry_delay=5)
def send_messages(self, app_instance_id, user_phone, text, connector,
line, sms=False, pushName=None,
message_id=None, attachments=None, profilepic_url=None,
chat_id=None, chat_url=None, user_id=None):
init_message = "System: initiation message."
try:
if not app_instance_id:
raise ValueError("app_instance_id is required and cannot be None or empty")
app_instance = AppInstance.objects.get(id=app_instance_id)
print(pushName)
bitrix_msg = {
"CONNECTOR": connector,
"LINE": line,
"MESSAGES": [
{
"user": {
"phone": user_phone,
"name": pushName or user_phone,
"id": user_id or user_phone,
"skip_phone_validate": 'Y',
# "picture": {
# "url": profilepic_url
# }
},
"chat": {
"id": chat_id or user_phone,
"url": chat_url
},
"message": {
"text": text,
"id": message_id,
"files": attachments or []
}
}
],
}
resp = call_method(app_instance, "imconnector.send.messages", bitrix_msg)
result = resp.get("result", {})
results = result.get("DATA", {}).get("RESULT", [])
for result_item in results:
chat_session = result_item.get("session", {})
if chat_session:
domain = app_instance.portal.domain
chat_id = chat_session.get("CHAT_ID")
identity = user_id or user_phone
redis_client.set(f"bitrix_chat:{domain}:{line}:{identity}", chat_id)
return resp
except Exception as e:
raise e
@shared_task(bind=True, max_retries=5, default_retry_delay=5)
def message_add(self, app_instance_id, line_id, user_phone, text, connector):
try:
app_instance = AppInstance.objects.get(id=app_instance_id)
except AppInstance.DoesNotExist:
logger.error(f"AppInstance {app_instance_id} does not exist")
raise
domain = app_instance.portal.domain
chat_key = f'bitrix_chat:{domain}:{line_id}:{user_phone}'
if redis_client.exists(chat_key):
chat_id = redis_client.get(chat_key).decode('utf-8')
payload = {
"DIALOG_ID": f"chat{chat_id}",
"MESSAGE": text,
}
max_send_attempts = 3
for attempt in range(max_send_attempts):
try:
resp = call_method(app_instance, "im.message.add", payload)
message_id = resp.get("result")
print(message_id)
redis_client.setex(f'bitrix:{domain}:{message_id}', 600, message_id)
payload_status = {
"CONNECTOR": connector,
"LINE": line_id,
"MESSAGES": [{
"im": {
"chat_id": str(chat_id),
"message_id": str(message_id)
}
}]
}
delivery_status_resp = call_method(app_instance, "imconnector.send.status.delivery", payload_status)
print(delivery_status_resp)
return resp
except Exception as e:
if attempt >= max_send_attempts - 1:
logger.error(f"Exception occurred while sending message: {e}")
raise
else:
self.retry(exc=e)
send_messages(app_instance_id, user_phone, text, connector, line_id, True)
@shared_task
def create_deal(app_instance_id, vendor_inst_id, app_name):
app_instance = AppInstance.objects.get(id=app_instance_id)
try:
user_current = call_method(app_instance, "user.current", {})
user_data = user_current.get("result", {})
user_email = user_data.get("EMAIL")
except Exception as e:
return
if not user_email:
return
user_id = None
venrot_instance = AppInstance.objects.get(id=vendor_inst_id)
# Поиск контакта в битрикс
payload = {
"FILTER": {
"EMAIL": user_email
},
"select": [FROM_MARKET_FIELD]
}
client_data = call_method(venrot_instance, "crm.contact.list", payload)
if "result" in client_data:
client_data = client_data.get("result", [])
if client_data:
from_market = client_data[0].get(FROM_MARKET_FIELD)
if from_market == "1":
return
user_id = client_data[0].get("ID")
if not user_id:
contact_data = {
"fields": {
"NAME": user_data.get("NAME"),
"LAST_NAME": user_data.get("LAST_NAME"),
FROM_MARKET_FIELD: "1",
"EMAIL": [
{
"VALUE": user_email,
"VALUE_TYPE": "WORK"
}
],
"PHONE": [
{
"VALUE": user_data.get("WORK_PHONE"),
"VALUE_TYPE": "WORK"
},
{
"VALUE": user_data.get("PERSONAL_MOBILE"),
"VALUE_TYPE": "MOBILE"
}
]
}
}
create_contact = call_method(venrot_instance, "crm.contact.add", contact_data)
if "result" in create_contact:
user_id = create_contact.get("result")
if user_id:
deal_data = {
"fields": {
"TITLE": f"Установка приложения: {app_name}",
"CONTACT_IDS": [user_id],
"OPENED": "N",
}
}
call_method(venrot_instance, "crm.deal.add", deal_data)
Рассмотрим асинхронные функции более детально:
call_api — функция для безопасного вызова API Bitrix24. Находит наш AppInstance по id и передает запрос в call_method.
get_app_info - периодическая задача для проверки статуса приложения. Выполняется через Celery Beat для всех экземпляров приложений и позволяет следить за актуальностью токенов.
send_messages - отвечает за отправку сообщений в открытые линии. Формирует правильную структуру сообщений, пришедших из внешних сервисов.
message_add - публикация сообщений в чате Битрикс. Формирует корректный payload и вызывает метод im.message.add .
create_deal - финальная стадия, автоматическое создание лида в CRM для нового чата.
Вызов api Bitrix осуществляется в crest.py. Благодаря этой функции, нам не нужно каждый раз прописывать эндпоинт и параметры запроса к Битрикс, проверять активность токена и обновлять его, а нужно лишь передать соответствующие данные и метод. Передаю привет принципам DRY :)
from urllib.parse import urlparse
import logging
import requests
from django.db import transaction
from django.conf import settings
from waweb.tasks import send_message_task
from users.models import Message
from .models import AppInstance
logger = logging.getLogger("django")
def call_method(appinstance: AppInstance, b24_method: str, data: dict, attempted_refresh=False, verify=True):
portal = appinstance.portal
endpoint = f"{portal.protocol}://{portal.domain}/rest/"
access_token = appinstance.access_token
print(b24_method)
payload = {"auth": access_token, **data}
print(payload)
try:
response = requests.post(f"{endpoint}{b24_method}", json=payload,
allow_redirects=False, timeout=60, verify=verify)
print(response.json())
appinstance.status = response.status_code
except requests.exceptions.SSLError:
if verify:
return call_method(appinstance, b24_method, data, attempted_refresh, verify=False)
else:
raise
if response.status_code == 302 and not attempted_refresh:
new_url = response.headers['Location']
parsed_url = urlparse(new_url)
portal = appinstance.portal
domain = parsed_url.netloc
if portal.domain != domain:
portal.domain = domain
portal.save()
appinstance.attempts = 0
appinstance.save()
return call_method(appinstance, b24_method, data, attempted_refresh=True)
elif response.status_code == 200:
appinstance.attempts = 0
appinstance.save()
return response.json()
else:
appinstance.attempts += 1
appinstance.save()
if response.status_code == 401:
resp = response.json()
error = resp.get("error", "")
error_description = resp.get("error_description", "")
if "REST is available only on commercial plans" in error_description and not appinstance.portal.license_expired:
appinstance.portal.license_expired = True
appinstance.portal.save()
waweb_id = settings.WAWEB_SYTEM_ID
if waweb_id and appinstance.owner.phone_number:
try:
notification = Message.objects.get(code="b24_expired")
send_message_task(waweb_id, [str(appinstance.owner.phone_number)], notification.message)
except Message.DoesNotExist:
pass
raise Exception("b24 license expired")
if error == "expired_token" or error == "NO_AUTH_FOUND" and not attempted_refresh:
refreshed = refresh_token(appinstance)
if isinstance(refreshed, AppInstance):
return call_method(appinstance, b24_method, data, attempted_refresh=True)
else:
raise Exception(f"Token refresh failed for portal {appinstance.portal.domain}")
else:
raise Exception(f"Unauthorized error: {response.json()}")
raise Exception(f"Failed to call bitrix: {appinstance.portal.domain} "
f"status {response.status_code}, response: {response.json()}")
def refresh_token(appinstance: AppInstance):
payload = {
"grant_type": "refresh_token",
"client_id": appinstance.app.client_id,
"client_secret": appinstance.app.client_secret,
"refresh_token": appinstance.refresh_token,
}
response = requests.post("https://oauth.bitrix.info/oauth/token/", data=payload, timeout=60)
print(appinstance.refresh_token)
print(appinstance.access_token)
try:
response_data = response.json()
except Exception:
raise Exception(f"Invalid response while refreshing token for portal {appinstance.portal.domain}")
if response.status_code != 200:
raise Exception(f"Failed to refresh token: {appinstance.portal.domain} {response_data}")
appinstance.access_token = response_data["access_token"]
appinstance.refresh_token = response_data["refresh_token"]
with transaction.atomic():
appinstance.save(update_fields=["access_token", "refresh_token"])
return appinstance
Создадим вебхуки в views.py (создайте страницу успешной установки install_finish.html)
import uuid
import requests
from datetime import timedelta
from django.conf import settings
from django.contrib import messages
from django.shortcuts import render, redirect
from django.utils import timezone
from django.http import HttpResponse, HttpResponseForbidden
from django.views.decorators.csrf import csrf_exempt
from rest_framework.authtoken.models import Token
from .crest import call_method
from .utils import process_placement
from .models import AppInstance, Bitrix, Line, App
from django.contrib.auth import get_user_model, login, logout
User = get_user_model()
@csrf_exempt
def app_install(request):
if request.method == "HEAD":
return HttpResponse("ok")
app_id = request.GET.get("app-id")
protocol = request.GET.get("PROTOCOL")
domain = request.GET.get("DOMAIN")
data = request.POST
member_id = data.get("member_id")
auth_id = data.get("AUTH_ID")
if not app_id or not member_id or not domain or not auth_id:
return redirect("portals")
try:
app = App.objects.get(id=app_id)
except App.DoesNotExist:
return redirect("portals")
proto = "https" if protocol == "1" else "http"
owner = get_owner(request)
api_key, _ = Token.objects.get_or_create(user=app.owner)
payload = {
"event": "ONAPPINSTALL",
"HANDLER": f"https://{app.site}/api/bitrix/?api-key={api_key.key}&app-id={app_id}",
"auth": auth_id,
}
try:
response = requests.post(f"{proto}://{domain}/rest/event.bind", json=payload, timeout=60)
response.raise_for_status()
except requests.RequestException as e:
resp = response.json()
error_description = resp.get("error_description")
if "Handler already binded" in error_description:
return render(request, "install_finish.html")
else:
return HttpResponse(f"Bitrix event.bind failed {response.status_code, resp}")
return render(request, "install_finish.html")
@csrf_exempt
def app_settings(request):
if request.method == "POST":
try:
app_id = request.GET.get("app-id")
data = request.POST
domain = request.GET.get("DOMAIN")
member_id = data.get("member_id")
portal = Bitrix.objects.get(domain=domain, member_id=member_id)
except Exception as e:
return redirect("portals")
placement = data.get("PLACEMENT")
if placement == "SETTING_CONNECTOR":
return process_placement(request)
elif placement == "DEFAULT":
try:
app = App.objects.get(id=app_id)
except Exception:
return redirect("portals")
app_url = app.page_url
owner = get_owner(request)
if owner is None:
logout(request)
return redirect(app_url)
should_login = not request.user.is_authenticated or request.user != owner
if should_login:
if request.user.is_authenticated:
logout(request)
try:
login(request, owner, backend='django.contrib.auth.backends.ModelBackend')
except Exception:
return redirect(app_url)
AppInstance.objects.filter(portal=portal, owner__isnull=True).update(owner=owner)
Line.objects.filter(portal=portal, owner__isnull=True).update(owner=owner)
return redirect(f"{app_url}?domain={domain}")
else:
return redirect("portals")
elif request.method == "HEAD":
return HttpResponse("ok")
elif request.method == "GET":
return redirect("portals")
А также эндпоинты в urls.py (не забудьте подключить в urls в корне проекта):
from django.urls import path
from .views import app_settings, app_install
urlpatterns = [
path("app-settings/", app_settings, name="app_settings"),
path("app-install/", app_install, name="app_install"),
]
На этом основной функционал модуля Битрикс завершен. Функционал WhatsApp коннектора мы рассмотрим во второй части.
paab
Привет Павел!
Кажется это лишнее. C вашим виртуозным навыком строить бесконечные каскады вложенных конструкций, все можно было в одном модуле уместить.