001// -------------------------------------------------------------------------------- 002// Copyright 2002-2024 Echo Three, LLC 003// 004// Licensed under the Apache License, Version 2.0 (the "License"); 005// you may not use this file except in compliance with the License. 006// You may obtain a copy of the License at 007// 008// http://www.apache.org/licenses/LICENSE-2.0 009// 010// Unless required by applicable law or agreed to in writing, software 011// distributed under the License is distributed on an "AS IS" BASIS, 012// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013// See the License for the specific language governing permissions and 014// limitations under the License. 015// -------------------------------------------------------------------------------- 016 017package com.echothree.util.common.collection; 018 019import java.text.NumberFormat; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026 027public class SmartQueue<E> { 028 029 Log log = LogFactory.getLog(this.getClass()); 030 031 private final boolean monitorQueue; 032 private final int queueLen; 033 private final AtomicLong estimatedTotal; 034 private final boolean monitorProgress; 035 036 private final List<E> list; 037 038 private NumberFormat percentFormat; 039 private final AtomicLong produced = new AtomicLong(0); 040 private final AtomicLong consumed = new AtomicLong(0); 041 042 private boolean eof = false; 043 044 /** Creates a new instance of SmartQueue */ 045 public SmartQueue(final boolean monitorQueue, final boolean monitorProgress, final int queueLen, final Long estimatedTotal) { 046 this.monitorQueue = monitorQueue; 047 this.queueLen = queueLen; 048 this.estimatedTotal = estimatedTotal == null ? null : new AtomicLong(estimatedTotal); 049 this.monitorProgress = monitorProgress; 050 051 list = Collections.synchronizedList(new ArrayList<E>(queueLen)); 052 053 if(estimatedTotal != null && estimatedTotal != 0 && monitorProgress) { 054 percentFormat = NumberFormat.getPercentInstance(); 055 } 056 } 057 058 public synchronized void put(E data) { 059 while(list.size() >= queueLen) { 060 try { 061 if(monitorQueue) { 062 log.info("Waiting to put data"); 063 } 064 065 wait(); 066 } catch(InterruptedException ignored) { 067 } 068 } 069 070 list.add(data); 071 072 if(estimatedTotal != null) { 073 long producedTotal = produced.incrementAndGet(); 074 075 if(monitorProgress) { 076 if(producedTotal > estimatedTotal.get()) { 077 estimatedTotal.set(producedTotal); 078 } 079 080 long myEstimatedTotal = estimatedTotal.get(); 081 if(producedTotal % 20 == 0 || producedTotal == myEstimatedTotal) { 082 log.info("Produced progress: " + producedTotal + " of " + myEstimatedTotal + " completed (" + percentFormat.format(producedTotal * 1.0 / myEstimatedTotal) + ")."); 083 } 084 } 085 } 086 087 notifyAll(); 088 } 089 090 public synchronized boolean isEmpty() { 091 return list.isEmpty(); 092 } 093 094 public synchronized E take() { 095 E obj = null; 096 097 while(list.size() <= 0 && !eof) { 098 try { 099 if(monitorQueue) { 100 log.info("Waiting to consume data"); 101 } 102 103 wait(); 104 } catch(InterruptedException ignored) { 105 } 106 } 107 108 if(list.size() > 0) { 109 try { 110 obj = list.remove(0); 111 112 if(estimatedTotal != null) { 113 var consumedTotal = consumed.incrementAndGet(); 114 115 if(monitorProgress) { 116 if(consumedTotal > estimatedTotal.get()) { 117 estimatedTotal.set(consumedTotal); 118 } 119 120 var myEstimatedTotal = estimatedTotal.get(); 121 if(consumedTotal % 20 == 0 || consumedTotal == myEstimatedTotal) { 122 log.info("Consumed progress: " + consumedTotal + " of " + myEstimatedTotal + " completed (" + percentFormat.format(consumedTotal * 1.0 / myEstimatedTotal) + ")."); 123 } 124 } 125 } 126 } catch(IndexOutOfBoundsException ignored) { 127 // obj stays null. 128 } 129 } else { 130 if(monitorQueue) { 131 log.info("Woke up because end of document"); 132 } 133 } 134 135 notifyAll(); 136 137 return obj; 138 } 139 140 public synchronized void end() { 141 eof = true; 142 notifyAll(); 143 } 144 145 public synchronized boolean onEnd() { 146 return eof; 147 } 148 149}