Monitoren aanwezigheid van gebruikers

Monitoren aanwezigheid van gebruikers

U bevindt zich hier:
Geschatte leestijd: 4 min.

In dit API voorbeeld wordt event polling gebruikt voor het volgen van de “phone and user status events” (om aanwezigheids-informatie weer te geven). Op vergelijkbare wijze kun je natuurlijk ook andere events pollen zoals call events of voicemail events.

user_monitor.py

#!/usr/bin/python3
"""
Sample python script showing how to monitor PBX user presence using the PBX
API.
"""
import sys
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
import logging
import argparse
import ssl
from getpass import getpass
from base64 import b64encode
try:
import json
except ImportError:
import simplejson as json
MAX_RETRIES = 3
SSL_CONTEXT = None
def disable_certificate_verification():
global SSL_CONTEXT
if hasattr(ssl, "create_default_context"):
SSL_CONTEXT = ssl.create_default_context()
SSL_CONTEXT.check_hostname = False
SSL_CONTEXT.verify_mode = ssl.CERT_NONE
def api_call(call_url, auth, options=None):
"""Make an API call, return decoded JSON response.
Uses HTTP authentication and retries on error.
See api_call_simple() for simplified version.
"""
if options:
call_url += "?" + urllib.parse.urlencode(options)
if auth:
auth = [a.encode("utf-8") for a in auth]
auth_hdr = "Basic " + b64encode(b":".join(auth)).decode("us-ascii")
logging.debug("calling: {0!r}".format(call_url))
attempt = 0
while True:
attempt += 1
try:
req = urllib.request.Request(call_url)
req.add_header('Accept', 'application/json')
req.add_header('Authorization', auth_hdr)
response = urllib.request.urlopen(req, context=SSL_CONTEXT)
result = json.loads(response.read().decode("utf-8"))
break
except IOError as err:
print("I/O error for %s: %s" % (call_url, err))
if attempt == MAX_RETRIES:
raise
else:
continue
except ValueError as err:
print("Invalid response from %s: %s" % (call_url, err))
if attempt == MAX_RETRIES:
raise
else:
continue
logging.debug("got: {0!r}".format(result))
return result
def api_call_simple(call_url, api_key, options=None):
"""The minimal implementation of an API call."""
if not options:
options = {}
options['key'] = api_key
call_url += "?" + urllib.parse.urlencode(options)
return json.load(urllib.request.urlopen(call_url, context=SSL_CONTEXT))
def load_users(api_url, auth):
users = api_call(api_url + "users", auth, {"status": "1"})
return dict([(user['uuid'], user) for user in users])
def load_phones(api_url, auth):
phones = api_call(api_url + "phones", auth, {"status": "1"})
return dict([(phone['uuid'], phone) for phone in phones])
def load_events(api_url, auth, last=None, limit=None, timeout=None):
options = {}
if last is not None:
options["last"] = last
if limit is not None:
options["limit"] = limit
if timeout is not None:
options["timeout"] = timeout
events = api_call(api_url + "events", auth, options)
return events
def get_user_name(user):
username = user["username"]
full_name = user["full_name"]
if full_name:
name = "%s (%s)" % (username, full_name)
else:
name = username
return name
def dump_user(user, phones):
name = get_user_name(user)
phone_uuid = user['phone']
if not phone_uuid or phone_uuid not in phones:
print("%s: no phone" % (name,))
return
phone = phones[phone_uuid]
print("%s: %s" % (name, phone['status']))
def dump_users(users, phones):
users = list(users.values())
users.sort(key = lambda u: u["username"])
for user in users:
dump_user(user, phones)
def dump_user_verbose(user, phones):
name = get_user_name(user)
print("%s (%s):" % (name, user.get("uuid")))
print("    username: ", user.get("username"))
print("    full name:", user.get("full_name"))
if "phones" not in user:
# PBX 3.x
p_phone = user.get("primary_phone")
if p_phone:
u_phones = [{
"type": "phone",
"phone": p_phone
}]
else:
u_phones = []
else:
# PBX 4.x
u_phones = user["phones"]
for u_phone in u_phones:
u_phones = user.get("phones")
p_type = u_phone["type"]
phone_uuid = u_phone.get("phone")
if not phone_uuid or phone_uuid not in phones:
phone = None
else:
phone = phones[phone_uuid]
if p_type == "phone":
if not phone_uuid or phone_uuid not in phones:
continue
print("        phone: %s: %s" % (phone['phone_username'],
phone['status']))
elif p_type == "hot-desk":
print("        hot-desk")
elif p_type == "number":
print("        number: %s" % (u_phone.get("number"),))
elif p_type == "trunk number":
print("        number: %s on trunk: %s"
% (u_phone.get("number"), u_phone.get("trunk")))
else:
print("        unrecognized")
if not u_phones:
print("        none")
if "hot_desk" in user:
hot_desk = user["hot_desk"]
else:
hot_desk = user.get("current_phone")
if hot_desk == user.get("primary_phone"):
hot_desk = None
if hot_desk:
phone = phones.get(hot_desk)
if phone:
print("    logged in on phone: %s (%s)"
% (phone["phone_username"], phone["status"]))
def dump_users_verbose(users, phones):
users = list(users.values())
users.sort(key = lambda u: u["username"])
for user in users:
dump_user_verbose(user, phones)
def dump_target(target, users, phones):
user_uuid = target.get("user_uuid")
phone_uuid = target.get("phone_uuid")
trunk_uuid = target.get("trunk_uuid")
number = target.get("number")
if user_uuid:
user = users.get(user_uuid)
user_phone_type = target.get("user_phone_type")
if user:
print("    user:", get_user_name(user))
else:
print("    unknown user:", user_uuid)
if user_phone_type == "phone":
phone = phones.get(phone_uuid)
print("        phone:", phone["phone_username"]
if phone else phone_uuid)
elif user_phone_type == "hot-desk":
phone = phones.get(phone_uuid)
print("        hot-desk phone:", phone["phone_username"]
if phone else phone_uuid)
elif user_phone_type == "number":
print("        number:", number)
elif user_phone_type == "trunk number":
print("        number:", number, "on trunk:", trunk_uuid)
elif user_phone_type == "forward":
print("        forwarded to:", number)
elif phone_uuid:
phone = phones.get(phone_uuid)
if phone:
print("    phone:", phone["phone_username"])
else:
print("    unknown phone:", phone_uuid)
elif number:
print("    number:", target["number"])
else:
print("    unknown:", json.dumps(target))
def dump_call(event, users, phones):
print(" call", event["unique_id"])
if "event_tag" in event:
print("  event tag:", event["event_tag"])
caller = event["caller"]
if "user_uuid" in caller:
user_uuid = caller["user_uuid"]
user = users.get(user_uuid)
if user:
print("  from user:", get_user_name(user))
else:
print("  from unknown user:", user_uuid)
if "phone_uuid" in caller:
phone_uuid = caller["phone_uuid"]
phone = phones.get(phone_uuid)
if phone:
if phone.get("name"):
print("  from phone:", phone["name"])
else:
print("  from unnamed phone:", phone_uuid)
else:
print("  from unknown phone:", phone_uuid)
if "normalized_number" in caller:
print("  from number (normalized):", caller["normalized_number"])
elif "number" in caller:
print("  from number:", caller["number"])
if "name" in caller:
print("  from name:", caller["name"])
if "called_did" in event:
print("  to DID:", event["called_did"])
if "called_extension" in event:
print("  to extension:", event["called_extension"])
if "called_number" in event:
print("  to number:", event["called_number"])
targets = event.get("targets")
if targets is None:
target = event.get("target")
if target:
print("  Target:")
dump_target(target, users, phones)
else:
for i, target in enumerate(targets, 1):
print("  Target #%i:" % (i,))
dump_target(target, users, phones)
def main():
parser = argparse.ArgumentParser("Axeos PBX API example")
parser.add_argument("--tls", "--ssl", action="store_true",
help="use TLS")
parser.add_argument("--disable-certificate-verification", action="store_true",
help="disable certificate verification")
parser.add_argument("--debug", action="store_true",
help="Display debug output")
parser.add_argument("--user",
help="PBX user name")
parser.add_argument("--password",
help="PBX user GUI password")
parser.add_argument("--api-key",
help="PBX API key")
parser.add_argument("pbx_address",
help="IP address or host name of a PBX")
options = parser.parse_args()
if options.disable_certificate_verification:
disable_certificate_verification()
if options.api_key:
if options.user or options.password:
parser.error("--api-key cannot be used with --user or --password")
auth = ('apiKey', options.api_key)
else:
if not options.user:
options.user = input("Username? > ")
if not options.password:
options.password = getpass("Password? > ")
auth = (options.user, options.password)
if options.tls:
api_url = 'https://{0}/apis/pbx/'.format(options.pbx_address)
else:
api_url = 'http://{0}/apis/pbx/'.format(options.pbx_address)
if options.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# flush old events
events = load_events(api_url, auth, limit=1, timeout=0)
if events:
last_event = events[0]['seq']
else:
last_event = None
# load user and phone database with current state
users = load_users(api_url, auth)
phones = load_phones(api_url, auth)
dump_users_verbose(users, phones)
print()
while True:
events = load_events(api_url, auth, last=last_event)
if events:
last_event = events[-1]['seq']
for event in events:
event_type = event['event_type']
if event_type == 'user list changed':
print('User list changed')
users = load_users(api_url, auth)
dump_users(users, phones)
elif event_type == 'phone list changed':
print('Phone list changed')
phones = load_phones(api_url, auth)
dump_users(users, phones)
elif event_type == 'phone status update':
phone_uuid = event['phone_uuid']
if phone_uuid not in phones:
print('status change for unknown phone:', phone_uuid)
else:
phone = phones[phone_uuid]
phone['status'] = event['status']
phone['status_code'] = event['status_code']
for user in list(users.values()):
if user['phone'] == phone_uuid:
dump_user(user, phones)
elif event_type == 'user logged in':
user_uuid = event['user_uuid']
if user_uuid not in users:
print('unknown user logged in:', user_uuid)
else:
user = users[user_uuid]
user['phone'] = event['phone_uuid']
dump_user(user, phone)
elif event_type == 'user logged out':
user_uuid = event['user_uuid']
if user_uuid not in users:
print('unknown user logged out:', user_uuid)
else:
user = users[user_uuid]
user['phone'] = user['primary_phone']
dump_user(user, phones)
elif event_type == 'incoming call':
print('incoming call:')
dump_call(event, users, phones)
elif event_type == 'api call':
print('api call:')
dump_call(event, users, phones)
elif event_type == 'internal dial':
print('internal dial:')
dump_call(event, users, phones)
elif event_type == 'call answered':
print('call answered:')
dump_call(event, users, phones)
elif event_type == 'call finished':
print('call finished (%s):' % (event["reason"],))
dump_call(event, users, phones)
else:
print('unknown event:', event_type)
print()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
except IOError:
print("Aborting!")
Was dit artikel nuttig?
Nee
Op de website van Axeos gebruiken wij cookies om uw ervaring te verbeteren en statistieken bij te houden. Lees hier meer over in ons privacybeleid. Ok