1 #
  2 # This program is free software: you can redistribute it and/or modify
  3 # it under the terms of the GNU General Public License as published by
  4 # the Free Software Foundation, either version 3 of the License, or
  5 # (at your option) any later version.
  6 #
  7 # This program is distributed in the hope that it will be useful,
  8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
  9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 10 # GNU General Public License for more details.
 11 #
 12 # You should have received a copy of the GNU General Public License
 13 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 14 #
 15
 16 import time
 17 from collections import deque
 18
 19 ##
 20 # Decorator of iterator interface, that allows the last item
 21 # to be postponed for a given time. The iterator must return
 22 # objects that have a packetid property. This is used to make
 23 # sure that there are not too many packets open simultaniously.
 24 #
 25
 26 class PostponableIterator:
 27     ##
 28     # Initializes the object with an iterator
 29     # @param self
 30     #   The object pointer
 31     # @param iterator
 32     #   The iterator that is being decorated
 33     # @param maxactive
 34     #   Maximal number of active packages. If this number is
 35     #   exceeded the iterator will first clear at least one
 36     #   active package before opening a new one.
 37     # @param defaultfrequency
 38     #   Default frequency value for newly opened ques.
 39     #
 40     def __init__(self, iterator, maxactive, defaultfrequency):
 41         self.__iterator = iterator.__iter__()
 42         self.__maxactive = maxactive
 43         self.__defaultfrequency = defaultfrequency
 44         self.__ques = {}
 45         self.__currentPacket = None
 46         self.__currentItem = None
 47         self.__pendingPacketItems = {}
 48     
 49     
 50     ##
 51     # Returns self to make sure that python knows this is an iterator
 52     # @param self
 53     #   The object pointer
 54     #
 55     def __iter__(self):
 56         return self
 57     
 58     
 59     ##
 60     # Next element in iteration
 61     # @param self
 62     #   The object pointer
 63     # @returns An element from any of the ques that can be re-tried, or the next element from the iterator
 64     #
 65     def next(self):
 66         # First check for a que item that is ready to be processed
 67         for que in self.__ques.keys():
 68             if self.__ques[que]['frequency']+self.__ques[que]['lastuse'] < time.time():
 69                 return self.__getItemFromQue(que)
 70         
 71         # If all ques are on hold, and there are less then maxactive active packets, than try a new item
 72         if len(self.__pendingPacketItems) < self.__maxactive:
 73             try:
 74                 self.__currentItem = self.__iterator.next()
 75                 self.__currentPacket = self.__currentItem.packetid
 76                 return self.__currentItem
 77             except StopIteration:
 78                 pass
 79         
 80         # If parent iterator is empty, or we have too many pending packages than check if there are
 81         # any pending items in our ques
 82         ques = self.__ques.keys()
 83         if len(ques) == 0:
 84             raise StopIteration
 85
 86         # If so, than flush one of them prematurely. Let's chose the one that would be flushed first
 87         usedque = ques[0]
 88         usedquetime = self.__ques[usedque]['frequency']+self.__ques[usedque]['lastuse']
 89         for que in ques:
 90             if usedquetime > self.__ques[que]['frequency']+self.__ques[que]['lastuse']:
 91                 usedque = que
 92                 usedquetime = self.__ques[que]['frequency']+self.__ques[que]['lastuse']
 93
 94         return self.__getItemFromQue(usedque)
 95   
 96     ##
 97     # This is a utility function that returns the next item from a given que, and
 98     # updates the lastuse time.
 99     # @param self
100     #   The object pointer
101     # @param que
102     #   A string identifier that let's the client to group postponed items.
103     # @returns The next item from the que
104     #
105     def __getItemFromQue(self,que):
106         self.__ques[que]['lastuse'] = time.time()
107         self.__currentItem = self.__ques[que]['items'].popleft()
108         self.__currentPacket = self.__currentItem.packetid
109         self.__pendingPacketItems[self.__currentPacket] -=1
110
111         if self.__pendingPacketItems[self.__currentPacket] < 1:
112             del self.__pendingPacketItems[self.__currentPacket]
113
114         if len(self.__ques[que]['items']) < 1:
115             del self.__ques[que]
116
117         return self.__currentItem
118     
119     ##
120     # Postpones the last item.
121     # @param self
122     #   The object pointer
123     # @param que
124     #   A string identifier that let's the client to group postponed items.
125     #
126     def postpone(self, que):
127         if not que in self.__ques:
128             self.__ques[que] = {'frequency':self.__defaultfrequency,'lastuse': time.time(), 'items':deque([])}
129
130         self.__ques[que]['items'].append(self.__currentItem)
131
132         if not self.__currentPacket in self.__pendingPacketItems:
133             self.__pendingPacketItems[self.__currentPacket] = 1
134         else:
135             self.__pendingPacketItems[self.__currentPacket] +=1
136     
137     
138     ##
139     # Sets how frequently elements should be taken out of a que.
140     # @param self
141     #   The object pointer
142     # @param que
143     #   Identifyer of the que
144     # @param seconds
145     #   The new frequency for the que in seconds
146     # @throws Exception
147     #   Que does not exsist, if the que identifeyer wasn't found
148     #
149     def setQueFrequency(self, que, seconds):
150         if que in self.__ques:
151             self.__ques[que]['frequency'] = seconds
152         else:
153             raise Exception('Que does not exsist')
154     ##
155     # Sets how frequently elements should be taken out of a que,
156     # but instead of frequency the desired length of the que is given
157     # in seconds
158     # @param self
159     #   The object pointer
160     # @param que
161     #   Identifyer of the que
162     # @param length
163     #   The desired length of the que in seconds
164     # @returns The new frequency value
165     #
166     def setQueFrequencyByLength(self, que, length):
167         newfrequency = length / self.queOccupance(que)
168         self.setQueFrequency(que, newfrequency)
169         return newfrequency
170
171     ##
172     # Gets the frequency associated with a particular que
173     # @param self
174     #   The object pointer
175     # @param que
176     #   Identifyer of the que
177     # @returns Frequency in seconds
178     # @throws Exception
179     #   Que does not exsist, if the que identifeyer wasn't found
180     #
181     def getQueFrequency(self, que):
182         if que in self.__ques:
183             return self.__ques[que]['frequency']
184         else:
185             raise Exception('Que does not exsist')
186     
187     ##
188     # Tells how many operations were postponed from a particular que.
189     # @param self
190     #   The object pointer
191     # @param que
192     #   Identifyer of the que
193     # @returns Number of postponed operations in the que
194     #
195     def queOccupance(self, que):
196         if que in self.__ques:
197             return len(self.__ques[que]['items'])
198         else:
199             return 0
200     
201     ##
202     # Tells how many seconds is left till the last item of the que is returned
203     # @param self
204     #   The object pointer
205     # @param que
206     #   Identifyer of the que
207     # @returns Senconds left till the last item in a que is returned
208     # @throws Exception
209     #   Que does not exsist, if the que identifeyer wasn't found
210     #
211     def queLastItemTime(self, que):
212         if que in self.__ques:
213             return round(self.__ques[que]['lastuse'] + self.__ques[que]['frequency'] * self.queOccupance(que) - time.time())
214         else:
215             raise Exception('Que does not exsist')
216     
217     
218     ##
219     # Tells if there are postponed items in a particular packet
220     # @param self
221     #   The object pointer
222     # @param packetid
223     #   Id of the packet
224     # @returns boolean
225     #
226     def isPacketActive(self,packetid):
227         if self.__iterator.isPacketActive(packetid):
228             return True
229
230         if packetid in self.__pendingPacketItems:
231             if self.__pendingPacketItems[packetid]>0:
232                 return True
233
234         return False
235     
236     ##
237     # Returns some statistic
238     # @param self
239     #   The object pointer
240     # @return string
241     #
242     def getStat(self):
243         retval = ""
244         for q in self.__ques:
245             retval+= q
246             retval+= " with "
247             retval+= str(self.queOccupance(q))
248             retval+= " pending urls has a que length of "
249             retval+= str(self.queLastItemTime(q))
250             retval+= " seconds (frequency: "
251             retval+= str(self.__ques[q]['frequency'])
252             retval+= " seconds)\n"
253
254         return retval
255