Source code for rinse.wsa

"""WSA (Addressing) support for rinse SOAP client."""

NS_WSA = 'http://www.w3.org/2005/08/addressing'
URI_ANONYMOUS = \
    'http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous'
URI_UNSPECIFIED = \
    'http://schemas.xmlsoap.org/ws/2004/08/addressing/id/unspecified'


[docs]def append_wsa_headers( msg, to, action, message_id=None, relates_to=None, relationship_type=None, reply_to=None, from_endpoint=None, fault_to=None, ): """Add WSA (addressing) headers. >>> from rinse.wsa import append_wsa_headers >>> import lxml.usedoctest >>> from rinse.message import SoapMessage >>> from rinse.util import printxml >>> msg = SoapMessage() >>> f123 = msg.elementmaker( ... 'f123', ... 'http://www.fabrikam123.example/svc53', ... ) >>> msg.body = f123.Delete(f123.maxCount('42')) >>> append_wsa_headers( ... msg, ... 'mailto:joe@fabrikam123.example', ... 'http://fabrikam123.example/mail/Delete', ... message_id='uuid:aaaabbbb-cccc-dddd-eeee-ffffffffffff', ... reply_to='http://business456.example/client1', ... ) >>> print(msg['SOAPAction']) "http://fabrikam123.example/mail/Delete" >>> printxml(msg.etree()) <soapenv:Envelope xmlns:f123="http://www.fabrikam123.example/svc53" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing"> <soapenv:Header> <wsa:MessageID>uuid:aaaabbbb-cccc-dddd-eeee-ffffffffffff</wsa:MessageID> <wsa:To>mailto:joe@fabrikam123.example</wsa:To> <wsa:Action>http://fabrikam123.example/mail/Delete</wsa:Action> <wsa:ReplyTo> <wsa:Address>http://business456.example/client1</wsa:Address> </wsa:ReplyTo> </soapenv:Header> <soapenv:Body> <f123:Delete> <f123:maxCount>42</f123:maxCount> </f123:Delete> </soapenv:Body> </soapenv:Envelope> """ # perform simple validation on WSA headers if reply_to and not message_id: raise ValueError( 'wsa:ReplyTo set so wsa:MessageID MUST be present.', ) if fault_to and not message_id: raise ValueError( 'wsa:FaultTo set so wsa:MessageID MUST be present.', ) if relationship_type and not relates_to: raise ValueError( '/wsa:RelatesTo/@RelationshipType set ' 'so wsa:RelatesTo MUST be present.', ) # add 'SOAPAction' HTTP request header msg['SOAPAction'] = '"{}"'.format(action) # add WSA elements to SOAP headers relation_attrs = {} if relationship_type: relation_attrs['RelationshipType'] = relationship_type wsa = msg.elementmaker('wsa', NS_WSA) msg.headers.extend( header for header in [ wsa.MessageID(message_id) if message_id else None, wsa.RelatesTo(relates_to, **relation_attrs) if relates_to else None, wsa.To(to), wsa.Action(action), wsa.From(from_endpoint) if from_endpoint else None, wsa.ReplyTo(wsa.Address(reply_to)) if reply_to else None, wsa.FaultTo(wsa.Address(fault_to)) if fault_to else None, ] if header is not None )