Posts Tagged ‘ urllib2 ’

python-kerberos + urllib2

(Wow, old post sitting in my draft box for eons.  Might be useful to someone …)

At work, I’ve been working on getting the planet RSS aggregator to work w/ mod_auth_kerb + wordpress-mu. Thankfully, I found this quite useful post:
http://selenic.com/pipermail/mercurial/2008-June/019776.html

With this very useful snippet of code:
http://selenic.com/pipermail/mercurial/attachments/20080624/ec728dbb/attachment.py

#!/usr/bin/python

# urllib2 with kerberos proof of concept
# Copyright 2008 Lime Spot LLC

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# A copy of the GNU General Public License can be found at
# .

import re
import logging
import sys
import urllib2 as u2

import kerberos as k

def getLogger():
log = logging.getLogger(“http_negotiate_auth_handler”)
handler = logging.StreamHandler()
formatter = logging.Formatter(‘%(asctime)s %(levelname)s %(message)s’)
handler.setFormatter(formatter)
log.addHandler(handler)
return log

log = getLogger()

class HTTPNegotiateAuthHandler(u2.BaseHandler):
“””auth handler for urllib2 that does HTTP Negotiate Authentication
“””

rx = re.compile(‘(?:.*,)*\s*Negotiate\s*([^,]*),?’, re.I)
handler_order = 480 # before Digest auth

def negotiate_value(self, headers):
authreq = headers.get(‘www-authenticate’, None)

if authreq:
mo = HTTPNegotiateAuthHandler.rx.search(authreq)
if mo:
return mo.group(1)
else:
log.debug(“regex failed on: %s” % authreq)

else:
log.debug(“www-authenticate header not found”)

return None

def __init__(self):
self.retried = 0
self.context = None

def generate_request_header(self, req, headers):
neg_value = self.negotiate_value(headers)
if neg_value is None:
self.retried = 0
return None

if self.retried >= 5:
raise HTTPError(req.get_full_url(), 401, “negotiate auth failed”,
headers, None)

self.retried += 1

log.debug(“req.get_host() returned %s” % req.get_host())
result, self.context = k.authGSSClientInit(“HTTP@%s” % req.get_host())

if result < 1:
log.warning(“authGSSClientInit returned result %d” % result)
return None

log.debug(“authGSSClientInit() succeeded”)

result = k.authGSSClientStep(self.context, neg_value)

if result < 0:
log.warning(“authGSSClientStep returned result %d” % result)
return None

log.debug(“authGSSClientStep() succeeded”)

response = k.authGSSClientResponse(self.context)
log.debug(“authGSSClientResponse() succeeded”)

return “Negotiate %s” % response

def authenticate_server(self, headers):
neg_value = self.negotiate_value(headers)
if neg_value is None:
log.critical(“mutual auth failed. No negotiate header”)
return None

if k.authGSSClientStep(self.context, neg_value) < 1:
log.critical(“mutual auth failed: authGSSClientStep returned result %d” % result)

def clean_context(self):
if self.context is not None:
k.authGSSClientClean(self.context)

def http_error_401(self, req, fp, code, msg, headers):
log.debug(“inside http_error_401”)
try:
neg_hdr = self.generate_request_header(req, headers)

if neg_hdr is None:
log.debug(“neg_hdr was None”)
return None

req.add_unredirected_header(‘Authorization’, neg_hdr)
resp = self.parent.open(req)

self.authenticate_server(resp.info())

return resp

finally:
self.clean_context()

def test():
log.setLevel(logging.DEBUG)
log.info(“starting test”)
opener = u2.build_opener()
opener.add_handler(HTTPNegotiateAuthHandler())
resp = opener.open(sys.argv[1])
print dir(resp), resp.info(), resp.code

if __name__ == ‘__main__’:
test()

Packages to come, and hopefully I’ll get the patch pushed upstream soon.