This commit is contained in:
filimonov
2024-02-20 16:43:56 +07:00
parent f79efa2951
commit a086f94798
28 changed files with 1469 additions and 144 deletions

213
worker-dev/app copy.py Normal file
View File

@@ -0,0 +1,213 @@
import pika
import sys
import os
import json
import datetime
import requests
from pymongo import MongoClient
import time
from phone import phone
db_connection = MongoClient("mongodb://mongodb:Cc03Wz5XX3iI3uY3@mongo")
db_base = db_connection["phone-dev"]
coll_source = db_base["source"]
phoneAnswer = db_base["phone-answer"]
phoneNAnswer = db_base["phone-n-answer"]
phoneRecallTrue = db_base["phone-recall-true"]
phoneRecallFalse = db_base["phone-recall-false"]
phoneLog = db_base["phone-log"]
phoneDebug = db_base["phone-debug"]
coll_userkey = db_base['userkey']
mkt_phones = ['83912051046', '83912051045']
IgnoreList = ['83919865589', '83912051046', '83912051045', '84950213944', '84951252791', '83919865589',
'84951183750', '89919237009', '89919241441', '89919863883', '89919505445', '89919398228', '89919500798']
# Stats:
# 0 - ответили
# 1 - не приняли
# 2 - перезвонили успешно
# 4 - Перезвонили неуспешно
# LOG
# 0 - выходящий вызов закончен
# 1 - Вызов не принят
# 2 - Перезвонили и дозвонились
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(
'rabbitmq', 5672, 'mkt', pika.PlainCredentials('rabbit', 'mrl2X0jwnYuCCiKFTshG7WKyOAhfDo')))
channel = connection.channel()
channel.queue_declare(queue='incoming-dev')
def callback(ch, method, properties, body: bytearray):
try:
# Парсим строку
srcJson = json.loads(str(body.decode('utf-8')).replace("\'", "\""))
srcJson["time"] = datetime.datetime.fromtimestamp(
srcJson["time"]).strftime('%Y-%m-%d %H:%M:%S')
# Определяем направление соединения
# Отсекаем лишние события по признаку длиины номера
if srcJson['to'] in mkt_phones and srcJson['direction'] == "incoming" and srcJson['state'] == "HANGUP" and srcJson not in IgnoreList and int(srcJson['duration']) > 15:
# Формируем словарь для загрузки
upd = {
"_id": srcJson['uuid'],
"client": srcJson['from'],
"operator": srcJson['to'],
"time": srcJson['time']
}
if srcJson.get("recordUrl", False):
# Запись бывает только у принятых вызовов, поэтому сразу добавляем в базу
try:
upd["recordUrl"] = srcJson["recordUrl"]
phoneAnswer.insert_one(upd)
phoneNAnswer.delete_many({"client": srcJson["from"]})
try:
print("0", upd)
except:
pass
except Exception as e:
pass
else:
# Добавляем запись только в том случае если ранее номер отсутствовал,
# В противном случае обновляем запись
try:
phoneNAnswer.update_one(filter={'client': srcJson['from']}, update={
'$set': upd}, upsert=True)
try:
print("1", upd)
except:
pass
except Exception as e:
pass
if srcJson['direction'] == "external" and srcJson['state'] == "START":
phoneNAnswer.update_one({"client": srcJson["to"]}, {
"currstate": "calling"})
# Парсим исходящие звонки только в том случае если продолжительность разговора более 40 секунд
if srcJson['state'] == "HANGUP" and srcJson['direction'] == "external" and int(srcJson['duration']) > 30:
# Обработка запускается только в том случае если в базе пропущенных есть записи
if phoneNAnswer.count_documents({"client": srcJson["to"]}) > 0:
# Удаляем запись из списка пропушеных
phoneNAnswer.delete_many(
filter={"client": {'$regex': srcJson["to"]}})
# формируем данные для загрузки
ins = {
"_id": srcJson["uuid"],
"client": srcJson["to"],
"operator": srcJson["from"],
"time": srcJson["time"],
"currstate": srcJson["from"]
}
# Если запись разговора есть то записываем считаем что дозвонились
if srcJson.get("recordUrl", False):
ins["recordUrl"] = srcJson["recordUrl"]
try:
# Добавляем в таблицу дозвонились успешно и удаляем по маске из недозвонившихся
phoneRecallTrue.insert_one(ins)
phoneNAnswer.delete_many(
filter={"client": {'$regex': srcJson["to"]}})
try:
print("3", ins)
except:
pass
except Exception as e:
print(2, e)
else:
try:
phoneRecallFalse.insert_one(ins)
except:
print(2, e)
except Exception as e:
print("!!!!!!!", e)
channel.basic_consume(
queue='incoming-dev', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
# # Определяем направление соединения
# if srcJson['direction'] == 'incoming':
# # Определяем начальный статус
# if srcJson['state'] == 'START':
# # Создаем переменную. Ответ = false, можно закрывать = false, Приоритетная линия
# if srcJson['to'] == '83912051045':
# tmpIncoming[srcJson['uuid']] = [False, False, True]
# else:
# tmpIncoming[srcJson['uuid']] = [False, False, False]
# # Обновление статуса при входящем звонке
# coll_phone.delete_one({'$and': [{'client': srcJson['from']}, {'status': 1}]})
# if srcJson['state'] == 'ANSWER' and srcJson['uuid'] in tmpIncoming:
# tmpIncoming[srcJson['uuid']][0] = True
# if srcJson['state'] == 'END' and srcJson['uuid'] in tmpIncoming:
# tmpIncoming[srcJson['uuid']][1] = True
# if srcJson['state'] == 'HANGUP' and srcJson['uuid'] in tmpIncoming and tmpIncoming[srcJson['uuid']][1] == True:
# try:
# srcJson['callstatus']
# insDict = {"client": srcJson['from'], "time": srcJson['time'], "status": 0,
# "recordUrl": srcJson["recordUrl"], "duration": srcJson["duration"], "important": tmpIncoming[srcJson['uuid']][2]}
# except Exception as e:
# print(e)
# insDict = {
# "client": srcJson['from'], "time": srcJson['time'], "status": 1, "important": tmpIncoming[srcJson['uuid']][2], "uuid": srcJson['uuid']}
# finally:
# coll_phone.insert_one(insDict)
# tmpIncoming.pop(srcJson['uuid'])
# try:
# insUserKey = {'userkey': srcJson['userkey']}
# coll_userkey.update_one(
# filter={
# 'operator': srcJson['to'],
# },
# update={
# '$set': insUserKey,
# },
# upsert=True
# )
# except Exception as e:
# print(e)
# if srcJson['direction'] == 'external':
# # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}})
# coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}})
# if srcJson['state'] == 'HANGUP':
# try:
# # Проверяем заполнено ли поле recordURL, если нет то меняем статус на 4
# if len(srcJson['recordUrl']) > 4:
# print(srcJson['uuid'], srcJson['recordUrl'])
# coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'recordUrl': srcJson['recordUrl'], 'status': 2}})
# else:
# print(srcJson['uuid'])
# coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'status': 4}})
# except Exception as e:
# print(e)
# except Exception as e:
# print(e.with_traceback)
# print(e)
# exit()
# channel.basic_consume(
# queue='incoming-dev', on_message_callback=callback, auto_ack=False)
# print(' [*] Waiting for messages. To exit press CTRL+C')
# channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@@ -3,26 +3,26 @@ import sys
import os
import json
import datetime
import requests
from pymongo import MongoClient
import time
from phone import phone
import requests
db_connection = MongoClient("mongodb://mongodb:Cc03Wz5XX3iI3uY3@mongo")
db_base = db_connection["phone-dev"]
coll_source = db_base["source"]
phoneAnswer = db_base["phone-answer"]
phoneNAnswer = db_base["phone-n-answer"]
phoneRecallTrue = db_base["phone-recall-true"]
phoneRecallFalse = db_base["phone-recall-false"]
phoneLog = db_base["phone-log"]
coll = db_base["phone"]
coll_all = db_base["phone-all"]
err = db_base["err"]
phoneDebug = db_base["phone-debug"]
coll_userkey = db_base['userkey']
mkt_phones = ['83912051046', '83912051045']
IgnoreList = ['83919865589', '83912051046', '83912051045', '84950213944', '84951252791', '83919865589',
'84951183750', '89919237009', '89919241441', '89919863883', '89919505445', '89919398228', '89919500798']
mobilon_wait = 3 # Время ожидания необходимо чтобы на стороне мобилона все улеглось
# Stats:
# 0 - ответили
# 1 - не приняли
@@ -30,6 +30,10 @@ IgnoreList = ['83919865589', '83912051046', '83912051045', '84950213944', '84951
# 4 - Перезвонили неуспешно
# LOG
# 0 - выходящий вызов закончен
# 1 - Вызов не принят
# 2 - Перезвонили и дозвонились
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(
@@ -38,78 +42,70 @@ def main():
channel.queue_declare(queue='incoming-dev')
print("RUN")
def callback(ch, method, properties, body: bytearray):
try:
# Парсим строку
srcJson = json.loads(str(body.decode('utf-8')).replace("\'", "\""))
srcJson["time"] = datetime.datetime.fromtimestamp(
srcJson["time"]).strftime('%Y-%m-%d %H:%M:%S')
# Определяем направление соединения
# Отсекаем лишние события по признаку длиины номера
# Парсим строку
ph = phone(body)
# Определяем тип соединения
# Если входящий
print(ph.ConvertToJSON())
if srcJson['to'] in mkt_phones and srcJson['direction'] == "incoming" and srcJson['state'] == "HANGUP" and srcJson not in IgnoreList and int(srcJson['duration']) > 30:
# Формируем словарь для загрузки
upd = {
"_id": srcJson['uuid'],
"client": srcJson['from'],
"operator": srcJson['to'],
"time": srcJson['time']
}
phoneDebug.insert_one(ph.ConvertToJSON())
if srcJson.get("recordUrl", False):
# Запись бывает только у принятых вызовов, поэтому сразу добавляем в базу
try:
upd["recordUrl"] = srcJson["recordUrl"]
phoneAnswer.insert_one(upd)
phoneNAnswer.delete_many({"client": srcJson["from"]})
except Exception as e:
pass
if ph.GetState() == "START":
ph.addStart()
if ph.GetState() == "END":
ph.addEnd()
if ph.GetState() == "HANGUP" and ph.isCanClose() == 1:
# Ждем
time.sleep(mobilon_wait)
t_req = "https://callinfo.services.mobilon.ru/api/call/info/1e86a98e026578eb5f6bf8c092c0c4a2/" + ph.GetUUID()
try:
res: dict = json.loads(requests.get(t_req).content)
except:
print("Get data from Mobilon", t_req)
try:
coll_all.insert_one(res)
except:
print("coll_all:", res)
# В этом месте описывается логика работы программы исходя основываясь на данных мобилона
if res["direction"] == "incoming":
# Подмена полей
res["client"] = res.pop("from")
res["mkt_phone"] = res.pop("to")
# 2. Удаляем пропущенные если есть
coll.delete_many(
{"client": {'$regex': res["client"]}, "status": "NOT_ANSWERED"})
coll.insert_one(res)
if res["direction"] == "external":
res["client"] = res.pop("to")
res["operator"] = res.pop("from")
# Совершенно непонятный костыль, откуда берется загадка
res.pop("_id")
if res["has_record"] == True and res["answered_duration"] > 8:
res["status"] = "RECALL_TRUE"
else:
# Добавляем запись только в том случае если ранее номер отсутствовал,
# В противном случае обновляем запись
try:
phoneNAnswer.update_one(filter={'client': srcJson['from']}, update={
'$set': upd}, upsert=True)
except Exception as e:
pass
res["status"] = "RECALL_FALSE"
if srcJson['direction'] == "external" and srcJson['state'] == "START":
phoneNAnswer.update_one({"client": srcJson["to"]}, {"currstate": "calling"})
# Парсим исходящие звонки только в том случае если продолжительность разговора более 40 секунд
if srcJson['state'] == "HANGUP" and srcJson['direction'] == "external" and int(srcJson['duration']) > 30:
# Обработка запускается только в том случае если в базе пропущенных есть записи
if phoneNAnswer.count_documents({"client": srcJson["to"]}) > 0:
# Удаляем запись из списка пропушеных
phoneNAnswer.delete_many(
filter={"client": {'$regex': srcJson["to"]}})
# формируем данные для загрузки
ins = {
"_id": srcJson["uuid"],
"client": srcJson["to"],
"operator": srcJson["from"],
"time": srcJson["time"],
"currstate": srcJson["from"]
}
# Если запись разговора есть то записываем считаем что дозвонились
if srcJson.get("recordUrl", False):
ins["recordUrl"] = srcJson["recordUrl"]
try:
# Добавляем в таблицу дозвонились успешно и удаляем по маске из недозвонившихся
phoneRecallTrue.insert_one(ins)
phoneRecallFalse.delete_one(
filter={"client": {'$regex': srcJson["to"]}})
except Exception as e:
print(2, e)
else:
try:
phoneRecallFalse.insert_one(ins)
except:
print(2, e)
except Exception as e:
print("!!!!!!!", e)
t = coll.update_one({
"client": res["client"],
"status": {'$in': ["NOT_ANSWERED", "RECALL_FALSE"]}
}, {'$set': res, '$inc': {"count_try": 1}})
coll.update_many({"count_try": {"$gt": 2}}, {"$set": {"status": "DELETED"}})
# -------------------------------
res.clear()
channel.basic_consume(
queue='incoming-dev', on_message_callback=callback, auto_ack=True)
@@ -118,74 +114,6 @@ def main():
channel.start_consuming()
# # Определяем направление соединения
# if srcJson['direction'] == 'incoming':
# # Определяем начальный статус
# if srcJson['state'] == 'START':
# # Создаем переменную. Ответ = false, можно закрывать = false, Приоритетная линия
# if srcJson['to'] == '83912051045':
# tmpIncoming[srcJson['uuid']] = [False, False, True]
# else:
# tmpIncoming[srcJson['uuid']] = [False, False, False]
# # Обновление статуса при входящем звонке
# coll_phone.delete_one({'$and': [{'client': srcJson['from']}, {'status': 1}]})
# if srcJson['state'] == 'ANSWER' and srcJson['uuid'] in tmpIncoming:
# tmpIncoming[srcJson['uuid']][0] = True
# if srcJson['state'] == 'END' and srcJson['uuid'] in tmpIncoming:
# tmpIncoming[srcJson['uuid']][1] = True
# if srcJson['state'] == 'HANGUP' and srcJson['uuid'] in tmpIncoming and tmpIncoming[srcJson['uuid']][1] == True:
# try:
# srcJson['callstatus']
# insDict = {"client": srcJson['from'], "time": srcJson['time'], "status": 0,
# "recordUrl": srcJson["recordUrl"], "duration": srcJson["duration"], "important": tmpIncoming[srcJson['uuid']][2]}
# except Exception as e:
# print(e)
# insDict = {
# "client": srcJson['from'], "time": srcJson['time'], "status": 1, "important": tmpIncoming[srcJson['uuid']][2], "uuid": srcJson['uuid']}
# finally:
# coll_phone.insert_one(insDict)
# tmpIncoming.pop(srcJson['uuid'])
# try:
# insUserKey = {'userkey': srcJson['userkey']}
# coll_userkey.update_one(
# filter={
# 'operator': srcJson['to'],
# },
# update={
# '$set': insUserKey,
# },
# upsert=True
# )
# except Exception as e:
# print(e)
# if srcJson['direction'] == 'external':
# # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}})
# coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}})
# if srcJson['state'] == 'HANGUP':
# try:
# # Проверяем заполнено ли поле recordURL, если нет то меняем статус на 4
# if len(srcJson['recordUrl']) > 4:
# print(srcJson['uuid'], srcJson['recordUrl'])
# coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'recordUrl': srcJson['recordUrl'], 'status': 2}})
# else:
# print(srcJson['uuid'])
# coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'status': 4}})
# except Exception as e:
# print(e)
# except Exception as e:
# print(e.with_traceback)
# print(e)
# exit()
# channel.basic_consume(
# queue='incoming-dev', on_message_callback=callback, auto_ack=False)
# print(' [*] Waiting for messages. To exit press CTRL+C')
# channel.start_consuming()
if __name__ == '__main__':
try:
main()

View File

@@ -7,7 +7,7 @@ WORKDIR /app
COPY requirements.txt requirements.txt
RUN python3 -m venv .venv
RUN /app/.venv/bin/pip3 install -r /app/requirements.txt
ADD app.py /app
ADD *.py /app
EXPOSE 5000
CMD [".venv/bin/python3", "app.py"]

60
worker-dev/phone.py Normal file
View File

@@ -0,0 +1,60 @@
import json
import redis
r = redis.Redis(host="redis")
class phone:
def __init__(self, message: bytearray):
self.msg = message
self.dict = self.ConvertToJSON()
def ConvertToJSON(self):
try:
return json.loads(str(self.msg.decode('utf-8')).replace("\'", "\""))
except:
return {}
# def GetDirection(self) -> str:
# return self.dict["direction"]
def GetUUID(self) -> str:
return self.dict["uuid"]
# def GetClient(self) -> str:
# return self.dict["from"] if self.GetDirection() == "incoming" else self.dict["to"]
# def GetOperator(self) -> str:
# return self.dict["to"] if self.GetDirection() == "incoming" else self.dict["from"]
# def GetTimestamp(self) -> int:
# """Return int timestamp"""
# return self.dict["time"]
def GetState(self) -> str:
return self.dict["state"]
# def isIncoming(self) -> bool:
# """True Если входящий, иначе False"""
# return True if self.GetDirection() == "incoming" else False
# def isExternal(self) -> bool:
# """True если исходящий, иначе False"""
# return True if self.GetDirection() == "external" else False
def isCanClose(self) -> int:
try:
return int(r.hget(self.GetUUID(), "canClose"))
except:
return 0
def addStart(self):
r.hset(self.GetUUID(), mapping={
"canClose": "0"
})
def addEnd(self):
r.hset(self.GetUUID(), mapping={
"canClose": "1"
})

View File

@@ -1,8 +1,18 @@
async-timeout==4.0.3
click==8.1.3
dnspython==2.3.0
Flask==2.2.3
importlib-metadata==6.2.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.2
pika==1.3.1
pymongo==4.3.3
certifi==2022.12.7
charset-normalizer==3.1.0
idna==3.4
requests==2.28.2
urllib3==1.26.15
urllib3==1.26.15
redis==5.0.1
Werkzeug==2.2.3
zipp==3.15.0