import pika import sys import os import json import datetime import requests from pymongo import MongoClient import time db_connection = MongoClient("mongodb://mongodb:Cc03Wz5XX3iI3uY3@mongo") db_base = db_connection["phone-dev"] coll_source = db_base["source"] phoneAnswer = db_base["phone-answer"] phoneNAnswer = db_base["phone-n-answer"] phoneRecallTrue = db_base["phone-recall-true"] phoneRecallFalse = db_base["phone-recall-false"] phoneLog = db_base["phone-log"] phoneDebug = db_base["phone-debug"] coll_userkey = db_base['userkey'] mkt_phones = ['83912051046', '83912051045'] IgnoreList = ['83919865589', '83912051046', '83912051045', '84950213944', '84951252791', '83919865589', '84951183750', '89919237009', '89919241441', '89919863883', '89919505445', '89919398228', '89919500798'] # Stats: # 0 - ответили # 1 - не приняли # 2 - перезвонили успешно # 4 - Перезвонили неуспешно def main(): connection = pika.BlockingConnection(pika.ConnectionParameters( 'rabbitmq', 5672, 'mkt', pika.PlainCredentials('rabbit', 'mrl2X0jwnYuCCiKFTshG7WKyOAhfDo'))) channel = connection.channel() channel.queue_declare(queue='incoming-dev') print("RUN") def callback(ch, method, properties, body: bytearray): try: # Парсим строку srcJson = json.loads(str(body.decode('utf-8')).replace("\'", "\"")) srcJson["time"] = datetime.datetime.fromtimestamp( srcJson["time"]).strftime('%Y-%m-%d %H:%M:%S') # Определяем направление соединения # Отсекаем лишние события по признаку длиины номера if srcJson['to'] in mkt_phones and srcJson['direction'] == "incoming" and srcJson['state'] == "HANGUP" and srcJson not in IgnoreList and int(srcJson['duration']) > 30: # Формируем словарь для загрузки upd = { "_id": srcJson['uuid'], "client": srcJson['from'], "operator": srcJson['to'], "time": srcJson['time'] } if srcJson.get("recordUrl", False): # Запись бывает только у принятых вызовов, поэтому сразу добавляем в базу try: upd["recordUrl"] = srcJson["recordUrl"] phoneAnswer.insert_one(upd) phoneNAnswer.delete_many({"client": srcJson["from"]}) except Exception as e: pass else: # Добавляем запись только в том случае если ранее номер отсутствовал, # В противном случае обновляем запись try: phoneNAnswer.update_one(filter={'client': srcJson['from']}, update={ '$set': upd}, upsert=True) except Exception as e: pass if srcJson['direction'] == "external" and srcJson['state'] == "START": phoneNAnswer.update_one({"client": srcJson["to"]}, {"currstate": "calling"}) # Парсим исходящие звонки только в том случае если продолжительность разговора более 40 секунд if srcJson['state'] == "HANGUP" and srcJson['direction'] == "external" and int(srcJson['duration']) > 30: # Обработка запускается только в том случае если в базе пропущенных есть записи if phoneNAnswer.count_documents({"client": srcJson["to"]}) > 0: # Удаляем запись из списка пропушеных phoneNAnswer.delete_many( filter={"client": {'$regex': srcJson["to"]}}) # формируем данные для загрузки ins = { "_id": srcJson["uuid"], "client": srcJson["to"], "operator": srcJson["from"], "time": srcJson["time"], "currstate": srcJson["from"] } # Если запись разговора есть то записываем считаем что дозвонились if srcJson.get("recordUrl", False): ins["recordUrl"] = srcJson["recordUrl"] try: # Добавляем в таблицу дозвонились успешно и удаляем по маске из недозвонившихся phoneRecallTrue.insert_one(ins) phoneRecallFalse.delete_one( filter={"client": {'$regex': srcJson["to"]}}) except Exception as e: print(2, e) else: try: phoneRecallFalse.insert_one(ins) except: print(2, e) except Exception as e: print("!!!!!!!", e) channel.basic_consume( queue='incoming-dev', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # # Определяем направление соединения # if srcJson['direction'] == 'incoming': # # Определяем начальный статус # if srcJson['state'] == 'START': # # Создаем переменную. Ответ = false, можно закрывать = false, Приоритетная линия # if srcJson['to'] == '83912051045': # tmpIncoming[srcJson['uuid']] = [False, False, True] # else: # tmpIncoming[srcJson['uuid']] = [False, False, False] # # Обновление статуса при входящем звонке # coll_phone.delete_one({'$and': [{'client': srcJson['from']}, {'status': 1}]}) # if srcJson['state'] == 'ANSWER' and srcJson['uuid'] in tmpIncoming: # tmpIncoming[srcJson['uuid']][0] = True # if srcJson['state'] == 'END' and srcJson['uuid'] in tmpIncoming: # tmpIncoming[srcJson['uuid']][1] = True # if srcJson['state'] == 'HANGUP' and srcJson['uuid'] in tmpIncoming and tmpIncoming[srcJson['uuid']][1] == True: # try: # srcJson['callstatus'] # insDict = {"client": srcJson['from'], "time": srcJson['time'], "status": 0, # "recordUrl": srcJson["recordUrl"], "duration": srcJson["duration"], "important": tmpIncoming[srcJson['uuid']][2]} # except Exception as e: # print(e) # insDict = { # "client": srcJson['from'], "time": srcJson['time'], "status": 1, "important": tmpIncoming[srcJson['uuid']][2], "uuid": srcJson['uuid']} # finally: # coll_phone.insert_one(insDict) # tmpIncoming.pop(srcJson['uuid']) # try: # insUserKey = {'userkey': srcJson['userkey']} # coll_userkey.update_one( # filter={ # 'operator': srcJson['to'], # }, # update={ # '$set': insUserKey, # }, # upsert=True # ) # except Exception as e: # print(e) # if srcJson['direction'] == 'external': # # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}}) # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}}) # if srcJson['state'] == 'HANGUP': # try: # # Проверяем заполнено ли поле recordURL, если нет то меняем статус на 4 # if len(srcJson['recordUrl']) > 4: # print(srcJson['uuid'], srcJson['recordUrl']) # coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'recordUrl': srcJson['recordUrl'], 'status': 2}}) # else: # print(srcJson['uuid']) # coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'status': 4}}) # except Exception as e: # print(e) # except Exception as e: # print(e.with_traceback) # print(e) # exit() # channel.basic_consume( # queue='incoming-dev', on_message_callback=callback, auto_ack=False) # print(' [*] Waiting for messages. To exit press CTRL+C') # channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)