Articles in this section
Category / Section

How to use non-UID attribute as CASA username

Published:
32 mins read

How to use non-UID attribute as CASA username

There might be a use case where an organization might need an email address or any other attribute as a primary key to log into CASA and/or through CASA to other applications.
In this test documentation, we are showing how to achieve that.

Gluu Server’s “Manage Authentication” module

Make Gluu Server’s default authentication allow the ‘mail’ attribute as the Primary Key instead of ‘uid’.

  • Log into oxTrust > Manage Authentication > Manage LDAP Authentication > Primary key and Local primary key == mail`
  • Activate it and save the configuration.
  • Make sure mail attribute is working as the primary attribute for authentication:
    • Open incognito window or new browser.
    • Try to log in to your Gluu server with EmailAddress and password.

Update the CASA script

We need to apply a little modification to the CASA script:

zico@zico-ThinkPad-T590:$ diff default.py mod.py 
132c132
<                         logged_in = authenticationService.authenticate(user_name, user_password)
---
>                         logged_in = authenticationService.authenticate(user_name, user_password, self.uid_attr, self.uid_attr)
zico@zico-ThinkPad-T590:$ 

Full script available below.

Restart services

Stop/Start oxAuth, Identity, casa services.

CASA script

# Author: Jose Gonzalez
# Modification: Sergay Manoylo and Zico
# Modification date: Aug 30, 2022

from java.lang import Integer
from java.util import Collections, HashMap, HashSet, ArrayList, Arrays, Date
from java.nio.charset import Charset

from org.apache.http.params import CoreConnectionPNames

from org.oxauth.persistence.model.configuration import GluuConfiguration
from org.gluu.oxauth.security import Identity
from org.gluu.oxauth.service import AuthenticationService, UserService
from org.gluu.oxauth.service.common import EncryptionService
from org.gluu.oxauth.service.custom import CustomScriptService
from org.gluu.oxauth.service.net import HttpService
from org.gluu.oxauth.util import ServerUtil
from org.gluu.model import SimpleCustomProperty
from org.gluu.model.casa import ApplicationConfiguration
from org.gluu.model.custom.script import CustomScriptType
from org.gluu.model.custom.script.type.auth import PersonAuthenticationType
from org.gluu.persist import PersistenceEntryManager
from org.gluu.service import CacheService
from org.gluu.service.cdi.util import CdiUtil
from org.gluu.util import StringHelper

try:
   import json
except ImportError:
   import simplejson as json
import sys

class PersonAuthentication(PersonAuthenticationType):

   def __init__(self, currentTimeMillis):
       self.currentTimeMillis = currentTimeMillis
       self.ACR_SG = "super_gluu"
       self.ACR_U2F = "u2f"

       self.modulePrefix = "casa-external_"

   def init(self, customScript, configurationAttributes):

       print "Casa. init called"
       self.authenticators = {}
       self.uid_attr = self.getLocalPrimaryKey()

       custScriptService = CdiUtil.bean(CustomScriptService)
       self.scriptsList = custScriptService.findCustomScripts(Collections.singletonList(CustomScriptType.PERSON_AUTHENTICATION), "oxConfigurationProperty", "displayName", "oxEnabled", "oxLevel")
       dynamicMethods = self.computeMethods(self.scriptsList)

       if len(dynamicMethods) > 0:
           print "Casa. init. Loading scripts for dynamic modules: %s" % dynamicMethods

           for acr in dynamicMethods:
               moduleName = self.modulePrefix + acr
               try:
                   external = __import__(moduleName, globals(), locals(), ["PersonAuthentication"], -1)
                   module = external.PersonAuthentication(self.currentTimeMillis)

                   print "Casa. init. Got dynamic module for acr %s" % acr
                   configAttrs = self.getConfigurationAttributes(acr, self.scriptsList)

                   if acr == self.ACR_U2F:
                       u2f_application_id = configurationAttributes.get("u2f_app_id").getValue2()
                       configAttrs.put("u2f_application_id", SimpleCustomProperty("u2f_application_id", u2f_application_id))
                   elif acr == self.ACR_SG:
                       application_id = configurationAttributes.get("supergluu_app_id").getValue2()
                       configAttrs.put("application_id", SimpleCustomProperty("application_id", application_id))

                   if module.init(None, configAttrs):
                       module.configAttrs = configAttrs
                       self.authenticators[acr] = module
                   else:
                       print "Casa. init. Call to init in module '%s' returned False" % moduleName
               except:
                   print "Casa. init. Failed to load module %s" % moduleName
                   print "Exception: ", sys.exc_info()[1]

           mobile_methods = configurationAttributes.get("mobile_methods")
           self.mobile_methods = [] if mobile_methods == None else StringHelper.split(mobile_methods.getValue2(), ",")

       print "Casa. init. Initialized successfully"
       return True


   def destroy(self, configurationAttributes):
       print "Casa. Destroyed called"
       return True


   def getApiVersion(self):
       return 11


   def getAuthenticationMethodClaims(self, configurationAttributes):
       return None


   def isValidAuthenticationMethod(self, usageType, configurationAttributes):
       print "Casa. isValidAuthenticationMethod called"
       return True


   def getAlternativeAuthenticationMethod(self, usageType, configurationAttributes):
       return None


   def authenticate(self, configurationAttributes, requestParameters, step):
       print "Casa. authenticate for step %s" % str(step)

       userService = CdiUtil.bean(UserService)
       authenticationService = CdiUtil.bean(AuthenticationService)
       identity = CdiUtil.bean(Identity)

       if step == 1:
           credentials = identity.getCredentials()
           user_name = credentials.getUsername()
           user_password = credentials.getPassword()

           if StringHelper.isNotEmptyString(user_name) and StringHelper.isNotEmptyString(user_password):

               foundUser = userService.getUserByAttribute(self.uid_attr, user_name)
               #foundUser = userService.getUser(user_name)
               if foundUser == None:
                   print "Casa. authenticate for step 1. Unknown username"
               else:
                   platform_data = self.parsePlatformData(requestParameters)
                   preferred = foundUser.getAttribute("oxPreferredMethod")
                   mfaOff = preferred == None
                   logged_in = False

                   if mfaOff:
                       logged_in = authenticationService.authenticate(user_name, user_password, self.uid_attr, self.uid_attr)
                   else:
                       acr = self.getSuitableAcr(foundUser, platform_data, preferred)
                       if acr != None:
                           module = self.authenticators[acr]
                           logged_in = module.authenticate(module.configAttrs, requestParameters, step)

                   if logged_in:
                       foundUser = authenticationService.getAuthenticatedUser()

                       if foundUser == None:
                           print "Casa. authenticate for step 1. Cannot retrieve logged user"
                       else:
                           if mfaOff:
                               identity.setWorkingParameter("skip2FA", True)
                           else:
                               #Determine whether to skip 2FA based on policy defined (global or user custom)
                               skip2FA = self.determineSkip2FA(userService, identity, foundUser, platform_data)
                               identity.setWorkingParameter("skip2FA", skip2FA)
                               identity.setWorkingParameter("ACR", acr)

                           return True

                   else:
                       print "Casa. authenticate for step 1 was not successful"
           return False

       else:
           user = authenticationService.getAuthenticatedUser()
           if user == None:
               print "Casa. authenticate for step 2. Cannot retrieve logged user"
               return False

           #see casa.xhtml
           alter = ServerUtil.getFirstValue(requestParameters, "alternativeMethod")
           if alter != None:
               #bypass the rest of this step if an alternative method was provided. Current step will be retried (see getNextStep)
               self.simulateFirstStep(requestParameters, alter)
               return True

           session_attributes = identity.getSessionId().getSessionAttributes()
           acr = session_attributes.get("ACR")
           #this working parameter is used in casa.xhtml
           identity.setWorkingParameter("methods", ArrayList(self.getAvailMethodsUser(user, acr)))

           success = False
           if acr in self.authenticators:
               module = self.authenticators[acr]
               success = module.authenticate(module.configAttrs, requestParameters, step)

           #Update the list of trusted devices if 2fa passed
           if success:
               print "Casa. authenticate. 2FA authentication was successful"
               tdi = session_attributes.get("trustedDevicesInfo")
               if tdi == None:
                   print "Casa. authenticate. List of user's trusted devices was not updated"
               else:
                   user.setAttribute("oxTrustedDevicesInfo", tdi)
                   userService.updateUser(user)
           else:
               print "Casa. authenticate. 2FA authentication failed"

           return success

       return False


   def prepareForStep(self, configurationAttributes, requestParameters, step):
       print "Casa. prepareForStep %s" % str(step)
       identity = CdiUtil.bean(Identity)

       if step == 1:
           self.prepareUIParams(identity)
           return True
       else:
           session_attributes = identity.getSessionId().getSessionAttributes()

           authenticationService = CdiUtil.bean(AuthenticationService)
           user = authenticationService.getAuthenticatedUser()

           if user == None:
               print "Casa. prepareForStep. Cannot retrieve logged user"
               return False

           acr = session_attributes.get("ACR")
           print "Casa. prepareForStep. ACR = %s" % acr
           identity.setWorkingParameter("methods", ArrayList(self.getAvailMethodsUser(user, acr)))

           if acr in self.authenticators:
               module = self.authenticators[acr]
               return module.prepareForStep(module.configAttrs, requestParameters, step)
           else:
               return False


   def getExtraParametersForStep(self, configurationAttributes, step):
       print "Casa. getExtraParametersForStep %s" % str(step)
       list = ArrayList()

       if step > 1:
           acr = CdiUtil.bean(Identity).getWorkingParameter("ACR")

           if acr in self.authenticators:
               module = self.authenticators[acr]
               params = module.getExtraParametersForStep(module.configAttrs, step)
               if params != None:
                   list.addAll(params)

           list.addAll(Arrays.asList("ACR", "methods", "trustedDevicesInfo"))

       list.addAll(Arrays.asList("casa_contextPath", "casa_prefix", "casa_faviconUrl", "casa_extraCss", "casa_logoUrl"))
       print "extras are %s" % list
       return list


   def getCountAuthenticationSteps(self, configurationAttributes):
       print "Casa. getCountAuthenticationSteps called"

       if CdiUtil.bean(Identity).getWorkingParameter("skip2FA"):
          return 1

       acr = CdiUtil.bean(Identity).getWorkingParameter("ACR")
       if acr in self.authenticators:
           module = self.authenticators[acr]
           return module.getCountAuthenticationSteps(module.configAttrs)
       else:
           return 2

       print "Casa. getCountAuthenticationSteps. Could not determine the step count for acr %s" % acr


   def getPageForStep(self, configurationAttributes, step):
       print "Casa. getPageForStep called %s" % str(step)

       if step > 1:
           acr = CdiUtil.bean(Identity).getWorkingParameter("ACR")
           if acr in self.authenticators:
               module = self.authenticators[acr]
               page = module.getPageForStep(module.configAttrs, step)
           else:
               page=None

           return page

       return "/casa/login.xhtml"


   def getNextStep(self, configurationAttributes, requestParameters, step):

       print "Casa. getNextStep called %s" % str(step)
       if step > 1:
           acr = ServerUtil.getFirstValue(requestParameters, "alternativeMethod")
           if acr != None:
               print "Casa. getNextStep. Use alternative method %s" % acr
               CdiUtil.bean(Identity).setWorkingParameter("ACR", acr)
               #retry step with different acr
               return 2

       return -1


   def logout(self, configurationAttributes, requestParameters):
       print "Casa. logout called"
       return True

# Miscelaneous

   def getLocalPrimaryKey(self):
       entryManager = CdiUtil.bean(PersistenceEntryManager)
       config = GluuConfiguration()
       config = entryManager.find(config.getClass(), "ou=configuration,o=gluu")
       #Pick (one) attribute where user id is stored (e.g. uid/mail)
       uid_attr = config.getOxIDPAuthentication().get(0).getConfig().getPrimaryKey()
       print "Casa. init. uid attribute is '%s'" % uid_attr
       return uid_attr


   def getSettings(self):
       entryManager = CdiUtil.bean(PersistenceEntryManager)
       config = ApplicationConfiguration()
       config = entryManager.find(config.getClass(), "ou=casa,ou=configuration,o=gluu")
       settings = None
       try:
           settings = json.loads(config.getSettings())
       except:
           print "Casa. getSettings. Failed to parse casa settings from DB"
       return settings


   def computeMethods(self, scriptList):

       methods = []
       mapping = {}
       cmConfigs = self.getSettings()

       if cmConfigs != None and 'acr_plugin_mapping' in cmConfigs:
           mapping = cmConfigs['acr_plugin_mapping']

       for m in mapping:
           for customScript in scriptList:
               if customScript.getName() == m and customScript.isEnabled():
                   methods.append(m)

       print "Casa. computeMethods. %s" % methods
       return methods


   def getConfigurationAttributes(self, acr, scriptsList):

       configMap = HashMap()
       for customScript in scriptsList:
           if customScript.getName() == acr and customScript.isEnabled():
               for prop in customScript.getConfigurationProperties():
                   configMap.put(prop.getValue1(), SimpleCustomProperty(prop.getValue1(), prop.getValue2()))

       print "Casa. getConfigurationAttributes. %d configuration properties were found for %s" % (configMap.size(), acr)
       return configMap


   def getAvailMethodsUser(self, user, skip=None):
       methods = HashSet()

       for method in self.authenticators:
           try:
               module = self.authenticators[method]
               if module.hasEnrollments(module.configAttrs, user):
                   methods.add(method)
           except:
               print "Casa. getAvailMethodsUser. hasEnrollments call could not be issued for %s module" % method
               print "Exception: ", sys.exc_info()[1]

       try:
           if skip != None:
               # skip is guaranteed to be a member of methods (if hasEnrollments routines are properly implemented).
               # A call to remove strangely crashes when skip is absent
               methods.remove(skip)
       except:
           print "Casa. getAvailMethodsUser. methods list does not contain %s" % skip

       print "Casa. getAvailMethodsUser %s" % methods.toString()
       return methods


   def prepareUIParams(self, identity):
       
       print "Casa. prepareUIParams. Reading UI branding params"
       cacheService = CdiUtil.bean(CacheService)
       casaAssets = cacheService.get("casa_assets")
           
       if casaAssets == None:
           #This may happen when cache type is IN_MEMORY, where actual cache is merely a local variable 
           #(a expiring map) living inside Casa webapp, not oxAuth webapp
           
           sets = self.getSettings()
           
           custPrefix = "/custom"
           logoUrl = "/images/logo.png"
           faviconUrl = "/images/favicon.ico"
           if ("extra_css" in sets and sets["extra_css"] != None) or sets["use_branding"]:
               logoUrl = custPrefix + logoUrl
               faviconUrl = custPrefix + faviconUrl
           
           prefix = custPrefix if sets["use_branding"] else ""
           
           casaAssets = {
               "contextPath": "/casa",
               "prefix" : prefix,
               "faviconUrl" : faviconUrl,
               "extraCss": sets["extra_css"] if "extra_css" in sets else None,
               "logoUrl": logoUrl
           }
       
       #Setting a single variable with the whole map does not work...
       identity.setWorkingParameter("casa_contextPath", casaAssets['contextPath'])
       identity.setWorkingParameter("casa_prefix", casaAssets['prefix'])
       identity.setWorkingParameter("casa_faviconUrl", casaAssets['contextPath'] + casaAssets['faviconUrl'])
       identity.setWorkingParameter("casa_extraCss", casaAssets['extraCss'])
       identity.setWorkingParameter("casa_logoUrl", casaAssets['contextPath'] + casaAssets['logoUrl'])


   def simulateFirstStep(self, requestParameters, acr):
       #To simulate 1st step, there is no need to call:
       # getPageforstep (no need as user/pwd won't be shown again)
       # isValidAuthenticationMethod (by restriction, it returns True)
       # prepareForStep (by restriction, it returns True)
       # getExtraParametersForStep (by restriction, it returns None)
       print "Casa. simulateFirstStep. Calling authenticate (step 1) for %s module" % acr
       if acr in self.authenticators:
           module = self.authenticators[acr]
           auth = module.authenticate(module.configAttrs, requestParameters, 1)
           print "Casa. simulateFirstStep. returned value was %s" % auth


# 2FA policy enforcement

   def parsePlatformData(self, requestParameters):
       try:
           #Find device info passed in HTTP request params (see index.xhtml)
           platform = ServerUtil.getFirstValue(requestParameters, "loginForm:platform")
           deviceInf = json.loads(platform)
       except:
           print "Casa. parsePlatformData. Error parsing platform data"
           deviceInf = None

       return deviceInf


   def getSuitableAcr(self, user, deviceInf, preferred):

       onMobile = deviceInf != None and 'isMobile' in deviceInf and deviceInf['isMobile']
       id = user.getUserId()
       strongest = -1
       acr = None
       user_methods = self.getAvailMethodsUser(user)

       for s in self.scriptsList:
           name = s.getName()
           level = Integer.MAX_VALUE if name == preferred else s.getLevel()
           if user_methods.contains(name) and level > strongest and (not onMobile or name in self.mobile_methods):
               acr = name
               strongest = level

       print "Casa. getSuitableAcr. On mobile = %s" % onMobile
       if acr == None and onMobile:
           print "Casa. getSuitableAcr. No mobile-friendly authentication method available for user %s" % id
           # user_methods is not empty when this function is called, so just pick any
           acr = user_methods.get(0)

       print "Casa. getSuitableAcr. %s was selected for user %s" % (acr, id)
       return acr


   def determineSkip2FA(self, userService, identity, foundUser, deviceInf):

       cmConfigs = self.getSettings()

       if cmConfigs == None:
           print "Casa. determineSkip2FA. Failed to read policy_2fa"
           return False

       missing = False
       if not 'plugins_settings' in cmConfigs:
           missing = True
       elif not 'strong-authn-settings' in cmConfigs['plugins_settings']:
           missing = True
       else:
           cmConfigs = cmConfigs['plugins_settings']['strong-authn-settings']

       policy2FA = 'EVERY_LOGIN'
       if not missing and 'policy_2fa' in cmConfigs:
           policy2FA = ','.join(cmConfigs['policy_2fa'])

       print "Casa. determineSkip2FA with general policy %s" % policy2FA
       policy2FA += ','
       skip2FA = False

       if 'CUSTOM,' in policy2FA:
           #read setting from user profile
           policy = foundUser.getAttribute("oxStrongAuthPolicy")
           if policy == None:
               policy = 'EVERY_LOGIN,'
           else:
               policy = policy.upper() + ','
           print "Casa. determineSkip2FA. Using user's enforcement policy %s" % policy

       else:
           #If it's not custom, then apply the global setting admin defined
           policy = policy2FA

       if not 'EVERY_LOGIN,' in policy:
           locationCriterion = 'LOCATION_UNKNOWN,' in policy
           deviceCriterion = 'DEVICE_UNKNOWN,' in policy

           if locationCriterion or deviceCriterion:
               if deviceInf == None:
                   print "Casa. determineSkip2FA. No user device data. Forcing 2FA to take place..."
               else:
                   skip2FA = self.process2FAPolicy(identity, foundUser, deviceInf, locationCriterion, deviceCriterion)

                   if skip2FA:
                       print "Casa. determineSkip2FA. Second factor is skipped"
                       #Update attribute if authentication will not have second step
                       devInf = identity.getWorkingParameter("trustedDevicesInfo")
                       if devInf != None:
                           foundUser.setAttribute("oxTrustedDevicesInfo", devInf)
                           userService.updateUser(foundUser)
           else:
               print "Casa. determineSkip2FA. Unknown %s policy: cannot skip 2FA" % policy

       return skip2FA


   def process2FAPolicy(self, identity, foundUser, deviceInf, locationCriterion, deviceCriterion):

       skip2FA = False
       #Retrieve user's devices info
       devicesInfo = foundUser.getAttribute("oxTrustedDevicesInfo")

       #do geolocation
       geodata = self.getGeolocation(identity)
       if geodata == None:
           print "Casa. process2FAPolicy: Geolocation data not obtained. 2FA skipping based on location cannot take place"

       try:
           encService = CdiUtil.bean(EncryptionService)

           if devicesInfo == None:
               print "Casa. process2FAPolicy: There are no trusted devices for user yet"
               #Simulate empty list
               devicesInfo = "[]"
           else:
               devicesInfo = encService.decrypt(devicesInfo)

           devicesInfo = json.loads(devicesInfo)

           partialMatch = False
           idx = 0
           #Try to find a match for device only
           for device in devicesInfo:
               partialMatch = device['browser']['name']==deviceInf['name'] and device['os']['version']==deviceInf['os']['version'] and device['os']['family']==deviceInf['os']['family']
               if partialMatch:
                   break
               idx+=1

           matchFound = False

           #At least one of locationCriterion or deviceCriterion is True
           if locationCriterion and not deviceCriterion:
               #this check makes sense if there is city data only
               if geodata!=None:
                   for device in devicesInfo:
                       #Search all registered cities that are found in trusted devices
                       for origin in device['origins']:
                           matchFound = matchFound or origin['city']==geodata['city']

           elif partialMatch:
               #In this branch deviceCriterion is True
               if not locationCriterion:
                   matchFound = True
               elif geodata!=None:
                   for origin in devicesInfo[idx]['origins']:
                       matchFound = matchFound or origin['city']==geodata['city']

           skip2FA = matchFound
           now = Date().getTime()

           #Update attribute oxTrustedDevicesInfo accordingly
           if partialMatch:
               #Update an existing record (update timestamp in city, or else add it)
               if geodata != None:
                   partialMatch = False
                   idxCity = 0

                   for origin in devicesInfo[idx]['origins']:
                       partialMatch = origin['city']==geodata['city']
                       if partialMatch:
                           break;
                       idxCity+=1

                   if partialMatch:
                       devicesInfo[idx]['origins'][idxCity]['timestamp'] = now
                   else:
                       devicesInfo[idx]['origins'].append({"city": geodata['city'], "country": geodata['country'], "timestamp": now})
           else:
               #Create a new entry
               browser = {"name": deviceInf['name'], "version": deviceInf['version']}
               os = {"family": deviceInf['os']['family'], "version": deviceInf['os']['version']}

               if geodata == None:
                   origins = []
               else:
                   origins = [{"city": geodata['city'], "country": geodata['country'], "timestamp": now}]

               obj = {"browser": browser, "os": os, "addedOn": now, "origins": origins}
               devicesInfo.append(obj)

           enc = json.dumps(devicesInfo, separators=(',',':'))
           enc = encService.encrypt(enc)
           identity.setWorkingParameter("trustedDevicesInfo", enc)

       except:
           print "Casa. process2FAPolicy. Error!", sys.exc_info()[1]

       return skip2FA


   def getGeolocation(self, identity):

       session_attributes = identity.getSessionId().getSessionAttributes()
       if session_attributes.containsKey("remote_ip"):
           remote_ip = session_attributes.get("remote_ip").split(",", 2)[0].strip()
           if StringHelper.isNotEmpty(remote_ip):

               httpService = CdiUtil.bean(HttpService)

               http_client = httpService.getHttpsClient()
               http_client_params = http_client.getParams()
               http_client_params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 4 * 1000)

               geolocation_service_url = "http://ip-api.com/json/%s?fields=country,city,status,message" % remote_ip
               geolocation_service_headers = { "Accept" : "application/json" }

               try:
                   http_service_response = httpService.executeGet(http_client, geolocation_service_url, geolocation_service_headers)
                   http_response = http_service_response.getHttpResponse()
               except:
                   print "Casa. Determine remote location. Exception: ", sys.exc_info()[1]
                   return None

               try:
                   if not httpService.isResponseStastusCodeOk(http_response):
                       print "Casa. Determine remote location. Get non 200 OK response from server:", str(http_response.getStatusLine().getStatusCode())
                       httpService.consume(http_response)
                       return None

                   response_bytes = httpService.getResponseContent(http_response)
                   response_string = httpService.convertEntityToString(response_bytes, Charset.forName("UTF-8"))
                   httpService.consume(http_response)
               finally:
                   http_service_response.closeConnection()

               if response_string == None:
                   print "Casa. Determine remote location. Get empty response from location server"
                   return None

               response = json.loads(response_string)

               if not StringHelper.equalsIgnoreCase(response['status'], "success"):
                   print "Casa. Determine remote location. Get response with status: '%s'" % response['status']
                   return None

               return response

       return None

       
   def getLogoutExternalUrl(self, configurationAttributes, requestParameters):
       print "Get external logout URL call"
       return None 
Was this article useful?
Like
Dislike
Help us improve this page
Please provide feedback or comments
Access denied
Access denied