61 lines
1.6 KiB
Python
61 lines
1.6 KiB
Python
import json
|
|
import redis
|
|
|
|
|
|
r = redis.Redis(host="redis")
|
|
|
|
|
|
class phone:
|
|
def __init__(self, message: bytearray):
|
|
self.msg = message
|
|
self.dict = self.ConvertToJSON()
|
|
|
|
def ConvertToJSON(self):
|
|
try:
|
|
return json.loads(str(self.msg.decode('utf-8')).replace("\'", "\""))
|
|
except:
|
|
return {}
|
|
|
|
# def GetDirection(self) -> str:
|
|
# return self.dict["direction"]
|
|
|
|
def GetUUID(self) -> str:
|
|
return self.dict["uuid"]
|
|
|
|
# def GetClient(self) -> str:
|
|
# return self.dict["from"] if self.GetDirection() == "incoming" else self.dict["to"]
|
|
|
|
# def GetOperator(self) -> str:
|
|
# return self.dict["to"] if self.GetDirection() == "incoming" else self.dict["from"]
|
|
|
|
# def GetTimestamp(self) -> int:
|
|
# """Return int timestamp"""
|
|
# return self.dict["time"]
|
|
|
|
def GetState(self) -> str:
|
|
return self.dict["state"]
|
|
|
|
# def isIncoming(self) -> bool:
|
|
# """True Если входящий, иначе False"""
|
|
# return True if self.GetDirection() == "incoming" else False
|
|
|
|
# def isExternal(self) -> bool:
|
|
# """True если исходящий, иначе False"""
|
|
# return True if self.GetDirection() == "external" else False
|
|
|
|
def isCanClose(self) -> int:
|
|
try:
|
|
return int(r.hget(self.GetUUID(), "canClose"))
|
|
except:
|
|
return 0
|
|
|
|
def addStart(self):
|
|
r.hset(self.GetUUID(), mapping={
|
|
"canClose": "0"
|
|
})
|
|
|
|
def addEnd(self):
|
|
r.hset(self.GetUUID(), mapping={
|
|
"canClose": "1"
|
|
})
|