root/xcap/interfaces/backend/sipthor.py

Revision 615, 18.7 KB (checked in by Saul Ibarra <saul@ag-projects.com>, 3 months ago)

Fix size constraint and don't delete previous icons on PUT

Line 
1# Copyright (C) 2007 AG-Projects.
2#
3# This module is prorietary to AG-Projects. Use of this module by third
4# parties is unsupported.
5
6import signal
7
8import cjson
9
10from formencode import validators
11
12from application import log
13from application.python.util import Singleton
14from application.system import default_host_ip
15from application.process import process
16from application.configuration import ConfigSection, ConfigSetting
17
18from sqlobject import sqlhub, connectionForURI, SQLObject, AND
19from sqlobject import StringCol, IntCol, DateTimeCol, SOBLOBCol, Col
20from sqlobject import MultipleJoin, ForeignKey
21
22from zope.interface import implements
23from twisted.internet import reactor
24from twisted.internet import defer
25from twisted.internet.defer import Deferred, maybeDeferred
26from twisted.cred.checkers import ICredentialsChecker
27from twisted.cred.credentials import IUsernamePassword, IUsernameHashedPassword
28from twisted.cred.error import UnauthorizedLogin
29
30from thor.link import ControlLink, Notification, Request
31from thor.eventservice import EventServiceClient, ThorEvent
32from thor.entities import ThorEntitiesRoleMap, GenericThorEntity as ThorEntity
33
34from gnutls.interfaces.twisted import X509Credentials
35from gnutls.constants import COMP_DEFLATE, COMP_LZO, COMP_NULL
36
37import xcap
38from xcap.tls import Certificate, PrivateKey
39from xcap.interfaces.backend import StatusResponse
40from xcap.dbutil import make_random_etag
41
42
43class ThorNodeConfig(ConfigSection):
44    __cfgfile__ = xcap.__cfgfile__
45    __section__ = 'ThorNetwork'
46
47    domain = "sipthor.net"
48    multiply = 1000
49    certificate = ConfigSetting(type=Certificate, value=None)
50    private_key = ConfigSetting(type=PrivateKey, value=None)
51    ca = ConfigSetting(type=Certificate, value=None)
52
53
54
55class JSONValidator(validators.Validator):
56
57    def to_python(self, value, state):
58        if value is None:
59            return None
60        try:
61            return cjson.decode(value)
62        except Exception:
63            raise validators.Invalid("expected a decodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state)
64
65    def from_python(self, value, state):
66        if value is None:
67            return None
68        try:
69            return cjson.encode(value)
70        except Exception:
71            raise validators.Invalid("expected an encodable JSON object in the JSONCol '%s', got %s %r instead" % (self.name, type(value), value), value, state)
72
73
74class SOJSONCol(SOBLOBCol):
75
76    def createValidators(self):
77        return [JSONValidator()] + super(SOJSONCol, self).createValidators()
78
79
80class JSONCol(Col):
81    baseClass = SOJSONCol
82
83
84class SipAccount(SQLObject):
85    class sqlmeta:
86        table = 'sip_accounts_meta'
87    username   = StringCol(length=64)
88    domain     = StringCol(length=64)
89    firstName  = StringCol(length=64)
90    lastName   = StringCol(length=64)
91    email      = StringCol(length=64)
92    customerId = IntCol(default=0)
93    resellerId = IntCol(default=0)
94    ownerId    = IntCol(default=0)
95    changeDate = DateTimeCol(default=DateTimeCol.now)
96    ## joins
97    data       = MultipleJoin('SipAccountData', joinColumn='account_id')
98
99    def _set_profile(self, value):
100        data = list(self.data)
101        if not data:
102            SipAccountData(account=self, profile=value)
103        else:
104            data[0].profile = value
105
106    def _get_profile(self):
107        return self.data[0].profile
108
109    def set(self, **kwargs):
110        kwargs = kwargs.copy()
111        profile = kwargs.pop('profile', None)
112        SQLObject.set(self, **kwargs)
113        if profile is not None:
114            self._set_profile(profile)
115
116
117class SipAccountData(SQLObject):
118    class sqlmeta:
119        table = 'sip_accounts_data'
120    account  = ForeignKey('SipAccount', cascade=True)
121    profile  = JSONCol()
122
123
124def sanitize_application_id(application_id):
125    if application_id == "org.openmobilealliance.pres-rules":
126        return "pres-rules"
127    else:
128        return application_id
129
130class ThorEntityAddress(str):
131    def __new__(cls, ip, control_port=None, version='unknown'):
132        instance = str.__new__(cls, ip)
133        instance.ip = ip
134        instance.version = version
135        instance.control_port = control_port
136        return instance
137
138
139class GetSIPWatchers(Request):
140    def __new__(cls, account):
141        command = "get sip_watchers for %s" % account
142        instance = Request.__new__(cls, command)
143        return instance
144
145
146class XCAPProvisioning(EventServiceClient):
147    __metaclass__ = Singleton
148    topics = ["Thor.Members"]
149
150    def __init__(self):
151        self._database = DatabaseConnection()
152        self.node = ThorEntity(default_host_ip, ['xcap_server'], version=xcap.__version__)
153        self.networks = {}
154        self.presence_message = ThorEvent('Thor.Presence', self.node.id)
155        self.shutdown_message = ThorEvent('Thor.Leave', self.node.id)
156        credentials = X509Credentials(ThorNodeConfig.certificate, ThorNodeConfig.private_key, [ThorNodeConfig.ca])
157        credentials.verify_peer = True
158        credentials.session_params.compressions = (COMP_LZO, COMP_DEFLATE, COMP_NULL)
159        self.control = ControlLink(credentials)
160        EventServiceClient.__init__(self, ThorNodeConfig.domain, credentials)
161        process.signals.add_handler(signal.SIGHUP, self._handle_SIGHUP)
162        process.signals.add_handler(signal.SIGINT, self._handle_SIGINT)
163        process.signals.add_handler(signal.SIGTERM, self._handle_SIGTERM)
164
165    def _disconnect_all(self, result):
166        self.control.disconnect_all()
167        EventServiceClient._disconnect_all(self, result)
168
169    def lookup(self, key):
170        network = self.networks.get("sip_proxy", None)
171        if network is None:
172            return None
173        try:
174            node = network.lookup_node(key)
175        except LookupError:
176            node = None
177        except Exception:
178            log.err()
179            node = None
180        return node
181
182    def notify(self, operation, entity_type, entity):
183        node = self.lookup(entity)
184        if node is not None:
185            if node.control_port is None:
186                log.error("Could not send notify because node %s has no control port" % node.ip)
187                return
188            self.control.send_request(Notification("notify %s %s %s" % (operation, entity_type, entity)), (node.ip, node.control_port))
189
190    def get_watchers(self, key):
191        node = self.lookup(key)
192        if node is None:
193            return defer.fail("no nodes found when searching for key %s" % str(key))
194        if node.control_port is None:
195            return defer.fail("could not send notify because node %s has no control port" % node.ip)
196        request = GetSIPWatchers(key)
197        request.deferred = Deferred()
198        self.control.send_request(request, (node.ip, node.control_port))
199        return request.deferred
200
201    def handle_event(self, event):
202        # print "Received event: %s" % event
203        networks = self.networks
204        role_map = ThorEntitiesRoleMap(event.message) ## mapping between role names and lists of nodes with that role
205        thor_databases = role_map.get('thor_database', [])
206        if thor_databases:
207            thor_databases.sort(lambda x, y: cmp(x.priority, y.priority) or cmp(x.ip, y.ip))
208            dburi = thor_databases[0].dburi
209        else:
210            dburi = None
211        self._database.update_dburi(dburi)
212        all_roles = role_map.keys() + networks.keys()
213        for role in all_roles:
214            try:
215                network = networks[role] ## avoid setdefault here because it always evaluates the 2nd argument
216            except KeyError:
217                from thor import network as thor_network
218                if role in ["thor_manager", "thor_monitor", "provisioning_server", "media_relay", "thor_database"]:
219                    continue
220                else:
221                    network = thor_network.new(ThorNodeConfig.multiply)
222                networks[role] = network
223            new_nodes = set([ThorEntityAddress(node.ip, getattr(node, 'control_port', None), getattr(node, 'version', 'unknown')) for node in role_map.get(role, [])])
224            old_nodes = set(network.nodes)
225            ## compute set differences
226            added_nodes = new_nodes - old_nodes
227            removed_nodes = old_nodes - new_nodes
228            if removed_nodes:
229                for node in removed_nodes:
230                    network.remove_node(node)
231                    self.control.discard_link((node.ip, node.control_port))
232                plural = len(removed_nodes) != 1 and 's' or ''
233                log.msg("removed %s node%s: %s" % (role, plural, ', '.join(removed_nodes)))
234            if added_nodes:
235                for node in added_nodes:
236                    network.add_node(node)
237                plural = len(added_nodes) != 1 and 's' or ''
238                log.msg("added %s node%s: %s" % (role, plural, ', '.join(added_nodes)))
239            #print "Thor %s nodes: %s" % (role, str(network.nodes))
240
241
242class NotFound(Exception):
243    pass
244
245
246class NoDatabase(Exception):
247    pass
248
249
250class DatabaseConnection(object):
251    __metaclass__ = Singleton
252
253    def __init__(self):
254        self.dburi = None
255
256    # Methods to be called from the Twisted thread:
257    def put(self, uri, document, check_etag, new_etag):
258        defer = Deferred()
259        operation = lambda profile: self._put_operation(uri, document, check_etag, new_etag, profile)
260        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer)
261        return defer
262
263    def delete(self, uri, check_etag):
264        defer = Deferred()
265        operation = lambda profile: self._delete_operation(uri, check_etag, profile)
266        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, True, defer)
267        return defer
268
269    def get(self, uri):
270        defer = Deferred()
271        operation = lambda profile: self._get_operation(uri, profile)
272        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer)
273        return defer
274
275    def get_profile(self, username, domain):
276        defer = Deferred()
277        reactor.callInThread(self.retrieve_profile, username, domain, lambda profile: profile, False, defer)
278        return defer
279
280    def get_documents_list(self, uri):
281        defer = Deferred()
282        operation = lambda profile: self._get_documents_list_operation(uri, profile)
283        reactor.callInThread(self.retrieve_profile, uri.user.username, uri.user.domain, operation, False, defer)
284        return defer
285
286
287    # Methods to be called in a separate thread:
288    def _put_operation(self, uri, document, check_etag, new_etag, profile):
289        application_id = sanitize_application_id(uri.application_id)
290        xcap_docs = profile.setdefault("xcap", {})
291        try:
292            etag = xcap_docs[application_id][uri.doc_selector.document_path][1]
293        except KeyError:
294            found = False
295        else:
296            found = True
297            check_etag(etag)
298        xcap_app = xcap_docs.setdefault(application_id, {})
299        xcap_app[uri.doc_selector.document_path] = (document, new_etag)
300        return found
301
302    def _delete_operation(self, uri, check_etag, profile):
303        application_id = sanitize_application_id(uri.application_id)
304        xcap_docs = profile.setdefault("xcap", {})
305        try:
306            etag = xcap_docs[application_id][uri.doc_selector.document_path][1]
307        except KeyError:
308            raise NotFound()
309        check_etag(etag)
310        del(xcap_docs[application_id][uri.doc_selector.document_path])
311        return None
312
313    def _get_operation(self, uri, profile):
314        try:
315            xcap_docs = profile["xcap"]
316            doc, etag = xcap_docs[sanitize_application_id(uri.application_id)][uri.doc_selector.document_path]
317        except KeyError:
318            raise NotFound()
319        return doc, etag
320
321    def _get_documents_list_operation(self, uri, profile):
322        try:
323            xcap_docs = profile["xcap"]
324        except KeyError:
325            raise NotFound()
326        return xcap_docs
327
328    def retrieve_profile(self, username, domain, operation, update, defer):
329        transaction = None
330        try:
331            if self.dburi is None:
332                raise NoDatabase()
333            transaction = sqlhub.processConnection.transaction()
334            try:
335                db_account = SipAccount.select(AND(SipAccount.q.username == username, SipAccount.q.domain == domain), connection = transaction, forUpdate = update)[0]
336            except IndexError:
337                raise NotFound()
338            profile = db_account.profile
339            result = operation(profile) # NB: may modify profile!
340            if update:
341                db_account.profile = profile
342            transaction.commit(close=True)
343        except Exception, e:
344            if transaction:
345                transaction.rollback()
346            reactor.callFromThread(defer.errback, e)
347        else:
348            reactor.callFromThread(defer.callback, result)
349
350    def update_dburi(self, dburi):
351        if self.dburi != dburi:
352            if self.dburi is not None:
353                sqlhub.processConnection.close()
354            if dburi is None:
355                sqlhub.processConnection
356            else:
357                sqlhub.processConnection = connectionForURI(dburi)
358            self.dburi = dburi
359
360
361class SipthorPasswordChecker(object):
362    implements(ICredentialsChecker)
363    credentialInterfaces = (IUsernamePassword, IUsernameHashedPassword)
364
365    def __init__(self):
366        self._database = DatabaseConnection()
367
368    def _query_credentials(self, credentials):
369        username, domain = credentials.username.split('@', 1)[0], credentials.realm
370        result = self._database.get_profile(username, domain)
371        result.addCallback(self._got_query_results, credentials)
372        result.addErrback(self._got_unsuccessfull)
373        return result
374
375    def _got_unsuccessfull(self, failure):
376        failure.trap(NotFound)
377        raise UnauthorizedLogin("Unauthorized login")
378
379    def _got_query_results(self, profile, credentials):
380        return self._authenticate_credentials(profile, credentials)
381
382    def _authenticate_credentials(self, profile, credentials):
383        raise NotImplementedError
384
385    def _checkedPassword(self, matched, username, realm):
386        if matched:
387            username = username.split('@', 1)[0]
388            ## this is the avatar ID
389            return "%s@%s" % (username, realm)
390        else:
391            raise UnauthorizedLogin("Unauthorized login")
392
393    def requestAvatarId(self, credentials):
394        """Return the avatar ID for the credentials which must have the username
395           and realm attributes, or an UnauthorizedLogin in case of a failure."""
396        d = self._query_credentials(credentials)
397        return d
398
399
400class PlainPasswordChecker(SipthorPasswordChecker):
401    """A credentials checker against a database subscriber table, where the passwords
402       are stored in plain text."""
403
404    implements(ICredentialsChecker)
405
406    def _authenticate_credentials(self, profile, credentials):
407        return maybeDeferred(
408                credentials.checkPassword, profile["password"]).addCallback(
409                self._checkedPassword, credentials.username, credentials.realm)
410
411
412class HashPasswordChecker(SipthorPasswordChecker):
413    """A credentials checker against a database subscriber table, where the passwords
414       are stored as MD5 hashes."""
415
416    implements(ICredentialsChecker)
417
418    def _authenticate_credentials(self, profile, credentials):
419        return maybeDeferred(
420                credentials.checkHash, profile["ha1"]).addCallback(
421                self._checkedPassword, credentials.username, credentials.realm)
422
423class Storage(object):
424    __metaclass__ = Singleton
425
426    def __init__(self):
427        self._database = DatabaseConnection()
428        self._provisioning = XCAPProvisioning()
429
430    def _normalize_document_path(self, uri):
431        ## some clients e.g. counterpath's eyebeam save presence rules under
432        ## different filenames between versions and they expect to find the same
433        ## information, thus we are forcing all presence rules documents to be
434        ## saved under "index.xml" default filename
435        if uri.application_id in ("org.openmobilealliance.pres-rules", "pres-rules"):
436            uri.doc_selector.document_path = "index.xml"
437
438    def get_document(self, uri, check_etag):
439        self._normalize_document_path(uri)
440        result = self._database.get(uri)
441        result.addCallback(self._got_document, check_etag)
442        result.addErrback(self._eb_not_found)
443        return result
444
445    def _eb_not_found(self, failure):
446        failure.trap(NotFound)
447        return StatusResponse(404)
448
449    def _got_document(self, (doc, etag), check_etag):
450        check_etag(etag)
451        return StatusResponse(200, etag, doc)
452
453    def put_document(self, uri, document, check_etag):
454        self._normalize_document_path(uri)
455        etag = make_random_etag(uri)
456        result = self._database.put(uri, document, check_etag, etag)
457        result.addCallback(self._cb_put, etag, "%s@%s" % (uri.user.username, uri.user.domain))
458        result.addErrback(self._eb_not_found)
459        return result
460
461    def _cb_put(self, found, etag, thor_key):
462        if found:
463            code = 200
464        else:
465            code = 201
466        self._provisioning.notify("update", "sip_account", thor_key)
467        return StatusResponse(code, etag)
468
469    def delete_document(self, uri, check_etag):
470        self._normalize_document_path(uri)
471        result = self._database.delete(uri, check_etag)
472        result.addCallback(self._cb_delete, "%s@%s" % (uri.user.username, uri.user.domain))
473        result.addErrback(self._eb_not_found)
474        return result
475
476    def _cb_delete(self, nothing, thor_key):
477        self._provisioning.notify("update", "sip_account", thor_key)
478        return StatusResponse(200)
479
480    def get_watchers(self, uri):
481        thor_key = "%s@%s" % (uri.user.username, uri.user.domain)
482        result = self._provisioning.get_watchers(thor_key)
483        result.addCallback(self._get_watchers_decode)
484        return result
485
486    def _get_watchers_decode(self, response):
487        if response.code == 200:
488            watchers = cjson.decode(response.data)
489            for watcher in watchers:
490                watcher["online"] = str(watcher["online"]).lower()
491            return watchers
492        else:
493            print "error: %s" % response
494
495    def get_documents_list(self, uri):
496        result = self._database.get_documents_list(uri)
497        result.addCallback(self._got_documents_list)
498        return result
499
500    def _got_documents_list(self, xcap_docs):
501        docs = {}
502        if xcap_docs:
503            for k, v in xcap_docs.iteritems():
504                for k2, v2 in v.iteritems():
505                    if docs.has_key(k):
506                        docs[k].append((k2, v2[1]))
507                    else:
508                        docs[k] = [(k2, v2[1])]
509        return docs
510
511
512installSignalHandlers = False
Note: See TracBrowser for help on using the browser.