import pika import sys import os import json import datetime import requests from pymongo import MongoClient db_connection = MongoClient("mongodb://mongodb:Cc03Wz5XX3iI3uY3@mongo") db_base = db_connection["phone-dev"] coll_source = db_base["source"] phoneAnswer = db_base["phone-answer"] phoneNAnswer = db_base["phone-n-answer"] phoneRecallTrue = db_base["phone-recall-true"] phoneRecallFalse = db_base["phone-recall-false"] phoneLog = db_base["phone-log"] coll_userkey = db_base['userkey'] mkt_phones = ['83912051046', '83912051045'] IgnoreList = ['83919865589', '83912051046', '83912051045', '84950213944', '84951252791', '83919865589', '84951183750', '89919237009', '89919241441', '89919863883', '89919505445', '89919398228', '89919500798'] # Stats: # 0 - ответили # 1 - не приняли # 2 - перезвонили успешно # 4 - Перезвонили неуспешно def WriteLog(desc, data, ex = None): try: log = {"dt": datetime.datetime.now(),"desc": desc, "data": data, "ex":ex } phoneLog.insert_one(log) except Exception as e: print("!!!", e) def main(): connection = pika.BlockingConnection(pika.ConnectionParameters( 'rabbitmq', 5672, 'mkt', pika.PlainCredentials('rabbit', 'mrl2X0jwnYuCCiKFTshG7WKyOAhfDo'))) channel = connection.channel() channel.queue_declare(queue='incoming-dev') def callback(ch, method, properties, body: bytearray): try: # Парсим строку srcJson = json.loads(str(body.decode('utf-8')).replace("\'", "\"")) srcJson["time"] = datetime.datetime.fromtimestamp( srcJson["time"]).strftime('%Y-%m-%d %H:%M:%S') # Определяем направление соединения # Отсекаем лишние события по признаку длиины номера if srcJson['to'] in mkt_phones and srcJson['direction'] == "incoming" and srcJson['state'] == "HANGUP" and srcJson not in IgnoreList: if srcJson.get("recordUrl", False): # Запись бывает только у принятых вызовов, поэтому сразу добавляем в базу try: WriteLog("Add Phone Answer", srcJson) phoneAnswer.insert_one(srcJson) except Exception as e: WriteLog("Add Phone Answer", srcJson, e) else: # Добавляем запись только в том случае если ранее номер отсутствовал, # В противном случае обновляем запись try: WriteLog("Add Phone Non Answer", srcJson) phoneNAnswer.update_one(filter={'from': srcJson['from']}, update={'$set': srcJson}, upsert=True) except Exception as e: WriteLog("Add Phone Non Answer", srcJson, e) if srcJson['state'] == "HANGUP" and srcJson['direction'] == "external": if phoneNAnswer.count_documents({"from": srcJson["to"]}) > 0: phoneNAnswer.delete_one(filter = {"from": srcJson["to"]}) if srcJson.get("recordUrl", False): phoneRecallTrue.insert_one(srcJson) phoneRecallFalse.delete_one(filter = {"to": srcJson["to"]}) else: phoneRecallFalse.insert_one(srcJson) except: print(e.with_traceback) channel.basic_consume( queue='incoming-dev', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # # Определяем направление соединения # if srcJson['direction'] == 'incoming': # # Определяем начальный статус # if srcJson['state'] == 'START': # # Создаем переменную. Ответ = false, можно закрывать = false, Приоритетная линия # if srcJson['to'] == '83912051045': # tmpIncoming[srcJson['uuid']] = [False, False, True] # else: # tmpIncoming[srcJson['uuid']] = [False, False, False] # # Обновление статуса при входящем звонке # coll_phone.delete_one({'$and': [{'client': srcJson['from']}, {'status': 1}]}) # if srcJson['state'] == 'ANSWER' and srcJson['uuid'] in tmpIncoming: # tmpIncoming[srcJson['uuid']][0] = True # if srcJson['state'] == 'END' and srcJson['uuid'] in tmpIncoming: # tmpIncoming[srcJson['uuid']][1] = True # if srcJson['state'] == 'HANGUP' and srcJson['uuid'] in tmpIncoming and tmpIncoming[srcJson['uuid']][1] == True: # try: # srcJson['callstatus'] # insDict = {"client": srcJson['from'], "time": srcJson['time'], "status": 0, # "recordUrl": srcJson["recordUrl"], "duration": srcJson["duration"], "important": tmpIncoming[srcJson['uuid']][2]} # except Exception as e: # print(e) # insDict = { # "client": srcJson['from'], "time": srcJson['time'], "status": 1, "important": tmpIncoming[srcJson['uuid']][2], "uuid": srcJson['uuid']} # finally: # coll_phone.insert_one(insDict) # tmpIncoming.pop(srcJson['uuid']) # try: # insUserKey = {'userkey': srcJson['userkey']} # coll_userkey.update_one( # filter={ # 'operator': srcJson['to'], # }, # update={ # '$set': insUserKey, # }, # upsert=True # ) # except Exception as e: # print(e) # if srcJson['direction'] == 'external': # # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}}) # coll_phone.update_one({'$and': [{'client': {'$regex': srcJson['to']}}, {'status': 1}]}, {'$set': {'status': 2, 'callid': srcJson['uuid']}}) # if srcJson['state'] == 'HANGUP': # try: # # Проверяем заполнено ли поле recordURL, если нет то меняем статус на 4 # if len(srcJson['recordUrl']) > 4: # print(srcJson['uuid'], srcJson['recordUrl']) # coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'recordUrl': srcJson['recordUrl'], 'status': 2}}) # else: # print(srcJson['uuid']) # coll_phone.update_one({'callid':srcJson['uuid']}, {'$set': {'status': 4}}) # except Exception as e: # print(e) # except Exception as e: # print(e.with_traceback) # print(e) # exit() # channel.basic_consume( # queue='incoming-dev', on_message_callback=callback, auto_ack=False) # print(' [*] Waiting for messages. To exit press CTRL+C') # channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)