Package backend :: Package server :: Package importlib :: Module userAuth
[hide private]
[frames] | no frames]

Source Code for Module backend.server.importlib.userAuth

  1  # 
  2  # Copyright (c) 2008--2016 Red Hat, Inc. 
  3  # 
  4  # This software is licensed to you under the GNU General Public License, 
  5  # version 2 (GPLv2). There is NO WARRANTY for this software, express or 
  6  # implied, including the implied warranties of MERCHANTABILITY or FITNESS 
  7  # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 
  8  # along with this software; if not, see 
  9  # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. 
 10  # 
 11  # Red Hat trademarks are not licensed under GPLv2. No permission is 
 12  # granted to use or replicate Red Hat trademarks that are incorporated 
 13  # in this software or its documentation. 
 14  # 
 15  # 
 16  # Authentication 
 17  # 
 18   
 19  import time 
 20  import sys 
 21  from spacewalk.common.rhnLog import log_debug 
 22  from spacewalk.common.rhnException import rhnFault 
 23  from spacewalk.common.rhnTranslate import _ 
 24  from spacewalk.common.rhnTB import add_to_seclist 
 25   
 26  from spacewalk.server import rhnSQL, rhnUser 
 27   
 28   
29 -class UserAuth:
30
31 - def __init__(self):
32 self.org_id = None 33 self.user_id = None 34 self.groups = []
35
36 - def auth(self, login, password):
37 add_to_seclist(password) 38 try: 39 self.groups, self.org_id, self.user_id = getUserGroups(login, password) 40 except rhnFault: 41 e = sys.exc_info()[1] 42 if e.code == 2: 43 # invalid login/password; set timeout to baffle 44 # brute force password guessing attacks (BZ 672163) 45 time.sleep(2) 46 raise 47 48 log_debug(4, "Groups: %s; org_id: %s; user_id: %s" % ( 49 self.groups, self.org_id, self.user_id))
50
51 - def auth_session(self, session_string):
52 user_instance = rhnUser.session_reload(session_string) 53 try: 54 self.groups, self.org_id, self.user_id = getUserGroupsFromUserInstance(user_instance) 55 except rhnFault: 56 e = sys.exc_info()[1] 57 if e.code == 2: 58 # invalid login/password; set timeout to baffle 59 # brute force password guessing attacks (BZ 672163) 60 time.sleep(2) 61 raise 62 63 log_debug(4, "Groups: %s; org_id: %s; user_id: %s" % ( 64 self.groups, self.org_id, self.user_id))
65
66 - def isOrgAdmin(self):
67 if 'org_admin' in self.groups: 68 log_debug(4, "Is org admin") 69 return 1 70 log_debug(4, "Is NOT org admin") 71 return 0
72
73 - def isChannelAdmin(self):
74 if 'org_admin' in self.groups: 75 log_debug(4, "Is channel admin because isa org admin") 76 return 1 77 if 'channel_admin' in self.groups: 78 log_debug(4, "Is channel admin") 79 return 1 80 log_debug(4, "Is NOT channel admin") 81 return 0
82
83 - def authzOrg(self, info):
84 # This function is a lot more complicated than it should be; the 85 # corner case is pushes without a channel; we have to deny regular 86 # users the ability to push to their org. 87 88 # If the org id is not specified, default to the user's org id 89 if 'orgId' not in info: 90 info['orgId'] = self.org_id 91 log_debug(4, "info[orgId]", info['orgId'], "org id", self.org_id) 92 93 org_id = info['orgId'] 94 95 if org_id == '': 96 # Satellites are not allowwd to push in the null org 97 raise rhnFault(4, 98 _("You are not authorized to manage packages in the null org")) 99 100 if org_id and self.org_id != org_id: 101 # Not so fast... 102 raise rhnFault(32, 103 _("You are not allowed to manage packages in the %s org") % 104 org_id) 105 106 # Org admins and channel admins have full privileges; we could use 107 # user_manages_channes, except for the case where there are no chanels 108 109 if self.isOrgAdmin() or self.isChannelAdmin(): 110 log_debug(4, "Org authorized (org_admin or channel_admin)") 111 return 112 113 # regular user at this point... check if the user manages any channels 114 if user_manages_channels(self.user_id): 115 log_debug(4, "Org authorized (user manages a channel)") 116 return 117 118 # ok, you're a regular user who doesn't manage any channels. 119 # take a hike. 120 raise rhnFault(32, 121 _("You are not allowed to perform administrative tasks"))
122
123 - def authzChannels(self, channels):
124 log_debug(4, channels) 125 if not channels: 126 return 127 128 # rhn_channel.user_role_check checks for the ownership of the channel 129 # by this user's org 130 131 h = rhnSQL.prepare(""" 132 select rhn_channel.user_role_check(id, :user_id, 'manage') manage 133 from rhnChannel 134 where label = :channel 135 """) 136 137 for channel in channels: 138 h.execute(channel=channel, user_id=self.user_id) 139 140 row = h.fetchone_dict() 141 # Either the channel doesn't exist, or not allowed to manage it 142 if not row or not row['manage']: 143 raise rhnFault(32, 144 _("You are not allowed to manage channel %s, or that " 145 "channel does not exist") % channel) 146 147 log_debug(4, "User %s allowed to manage channel %s" % 148 (self.user_id, channel)) 149 150 return None
151 152 153 # wregglej 12/21/05 This should only be used when the user instance has already been reloaded from 154 # a session.
155 -def getUserGroupsFromUserInstance(user_instance):
156 log_debug(4, user_instance.getid()) 157 user = user_instance 158 159 if not user: 160 log_debug("null user") 161 raise rhnFault(2) 162 163 # Don't need to check the password, the session should have already been checked. 164 165 # Get the org id 166 org_id = user.contact['org_id'] 167 user_id = user.getid() 168 h = rhnSQL.prepare(""" 169 select ugt.label 170 from rhnUserGroupType ugt, 171 rhnUserGroup ug, 172 rhnUserGroupMembers ugm 173 where ugm.user_id = :user_id 174 and ugm.user_group_id = ug.id 175 and ug.group_type = ugt.id 176 """) 177 h.execute(user_id=user_id) 178 groups = [] 179 while 1: 180 row = h.fetchone_dict() 181 if not row: 182 break 183 groups.append(row['label']) 184 return groups, org_id, user_id
185 186
187 -def getUserGroups(login, password):
188 # Authenticates a user and returns the list of groups it belongs 189 # to, and the org id 190 add_to_seclist(password) 191 log_debug(4, login) 192 user = rhnUser.search(login) 193 194 if not user: 195 log_debug("rhnUser.search failed") 196 raise rhnFault(2) 197 198 # Check the user's password 199 if not user.check_password(password): 200 log_debug("user.check_password failed") 201 raise rhnFault(2) 202 203 return getUserGroupsFromUserInstance(user)
204 205
206 -def user_manages_channels(user_id):
207 h = rhnSQL.prepare(""" 208 select distinct 1 209 from rhnChannel 210 where rhn_channel.user_role_check(id, :user_id, 'manage') = 1 211 """) 212 213 h.execute(user_id=user_id) 214 row = h.fetchone_dict() 215 216 return (row is not None)
217