001// --------------------------------------------------------------------------------
002// Copyright 2002-2024 Echo Three, LLC
003//
004// Licensed under the Apache License, Version 2.0 (the "License");
005// you may not use this file except in compliance with the License.
006// You may obtain a copy of the License at
007//
008//     http://www.apache.org/licenses/LICENSE-2.0
009//
010// Unless required by applicable law or agreed to in writing, software
011// distributed under the License is distributed on an "AS IS" BASIS,
012// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013// See the License for the specific language governing permissions and
014// limitations under the License.
015// --------------------------------------------------------------------------------
016
017package com.echothree.util.common.collection;
018
019import java.text.NumberFormat;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026
027public class SmartQueue<E> {
028    
029    Log log = LogFactory.getLog(this.getClass());
030
031    private final boolean monitorQueue;
032    private final int queueLen;
033    private final AtomicLong estimatedTotal;
034    private final boolean monitorProgress;
035
036    private final List<E> list;
037
038    private NumberFormat percentFormat;
039    private final AtomicLong produced = new AtomicLong(0);
040    private final AtomicLong consumed = new AtomicLong(0);
041
042    private boolean eof = false;
043
044    /** Creates a new instance of SmartQueue */
045    public SmartQueue(final boolean monitorQueue, final boolean monitorProgress, final int queueLen, final Long estimatedTotal) {
046        this.monitorQueue = monitorQueue;
047        this.queueLen = queueLen;
048        this.estimatedTotal = estimatedTotal == null ? null : new AtomicLong(estimatedTotal);
049        this.monitorProgress = monitorProgress;
050
051        list = Collections.synchronizedList(new ArrayList<E>(queueLen));
052
053        if(estimatedTotal != null && estimatedTotal != 0 && monitorProgress) {
054            percentFormat = NumberFormat.getPercentInstance();
055        }
056    }
057
058    public synchronized void put(E data) {
059        while(list.size() >= queueLen) {
060            try {
061                if(monitorQueue) {
062                    log.info("Waiting to put data");
063                }
064
065                wait();
066            } catch(InterruptedException ignored) {
067            }
068        }
069
070        list.add(data);
071
072        if(estimatedTotal != null) {
073            long producedTotal = produced.incrementAndGet();
074
075            if(monitorProgress) {
076                if(producedTotal > estimatedTotal.get()) {
077                    estimatedTotal.set(producedTotal);
078                }
079
080                long myEstimatedTotal = estimatedTotal.get();
081                if(producedTotal % 20 == 0 || producedTotal == myEstimatedTotal) {
082                    log.info("Produced progress: " + producedTotal + " of " + myEstimatedTotal + " completed (" + percentFormat.format(producedTotal * 1.0 / myEstimatedTotal) + ").");
083                }
084            }
085        }
086
087        notifyAll();
088    }
089    
090    public synchronized boolean isEmpty() {
091        return list.isEmpty();
092    }
093    
094    public synchronized E take() {
095        E obj = null;
096
097        while(list.size() <= 0 && !eof) {
098            try {
099                if(monitorQueue) {
100                    log.info("Waiting to consume data");
101                }
102
103                wait();
104            } catch(InterruptedException ignored) {
105            }
106        }
107
108        if(list.size() > 0) {
109            try {
110                obj = list.remove(0);
111
112                if(estimatedTotal != null) {
113                    var consumedTotal = consumed.incrementAndGet();
114
115                    if(monitorProgress) {
116                        if(consumedTotal > estimatedTotal.get()) {
117                            estimatedTotal.set(consumedTotal);
118                        }
119
120                        var myEstimatedTotal = estimatedTotal.get();
121                        if(consumedTotal % 20 == 0 || consumedTotal == myEstimatedTotal) {
122                            log.info("Consumed progress: " + consumedTotal + " of " + myEstimatedTotal + " completed (" + percentFormat.format(consumedTotal * 1.0 / myEstimatedTotal) + ").");
123                        }
124                    }
125                }
126            } catch(IndexOutOfBoundsException ignored) {
127                // obj stays null.
128            }
129        } else {
130            if(monitorQueue) {
131                log.info("Woke up because end of document");
132            }
133        }
134
135        notifyAll();
136
137        return obj;
138    }
139
140    public synchronized void end() {
141        eof = true;
142        notifyAll();
143    }
144
145    public synchronized boolean onEnd() {
146        return eof;
147    }
148    
149}