package CxCommon.flowmonitor;

import CxCommon.Exceptions.FlowMonitorDequeueException;
import CxCommon.Exceptions.InterchangeExceptions;
import CxCommon.common.ExtendedObject;
import com.ibm.wbis.flowmonitor.components.ComponentKey;
import java.util.ArrayList;
import java.util.List;

/* loaded from: input_file:CxCommon/flowmonitor/FlowEventConsumerThread.class */
public class FlowEventConsumerThread extends ExtendedObject implements Runnable, Messages {
    public static final String copyright = "(C) Copyright IBM Corporation 1997, 2003.";
    public static final long DEFAULT_QUEUE_WAIT_TIMOUT_MS = 100;
    public static final long BATCH_HOLD_TIMOUT_MS = 540;
    public static final int BATCH_SIZE = PersistentFlowEventRecord.BATCH_COMMIT_COUNT;
    protected FMEventQueue queue;
    protected PersistentFlowEventRecord eventWriter;
    protected List eventBatch;
    protected boolean isRunning;
    protected boolean isActive;
    protected long lastTimestamp;
    protected long queueWaitTimeout;

    public FlowEventConsumerThread(FMEventQueue fMEventQueue, ComponentKey componentKey) {
        super(9);
        this.isRunning = true;
        this.isActive = true;
        this.queueWaitTimeout = 100L;
        this.queue = fMEventQueue;
        this.eventWriter = new PersistentFlowEventRecord();
        this.eventBatch = new ArrayList();
        this.lastTimestamp = getTimestamp();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning) {
            if (this.isActive) {
                try {
                    if (shouldWrite()) {
                        writeRecords();
                    }
                    if (isTraceEnabled(5)) {
                        trace("FlowEventConsumerThread.dequeue");
                    }
                    MinimalEventRecord dequeue = this.queue.dequeue(this.queueWaitTimeout);
                    if (dequeue == null) {
                        this.queueWaitTimeout = 0L;
                        if (isTraceEnabled(5)) {
                            trace("FlowEventConsumerThread applying indefinite dequeue block");
                        }
                    } else {
                        this.queueWaitTimeout = 100L;
                        if (isTraceEnabled(5)) {
                            trace("FlowEventConsumerThread.storing event");
                        }
                        this.eventBatch.add(dequeue);
                    }
                } catch (FlowMonitorDequeueException e) {
                    log(Messages.DATABASE_WRITE_FAILED$1, 2, e.getMessage());
                } catch (InterruptedException e2) {
                    if (shouldWrite()) {
                        writeRecords();
                    }
                }
            } else {
                Thread.yield();
            }
        }
    }

    private boolean shouldWrite() {
        boolean z = getTimestamp() - this.lastTimestamp >= 540;
        boolean z2 = this.eventBatch.size() >= BATCH_SIZE;
        boolean z3 = this.queueWaitTimeout == 0;
        boolean z4 = this.eventBatch.size() > 0;
        if (isTraceEnabled(4)) {
            trace(new StringBuffer().append("FlowEventConsumerThread: recordsExist=").append(z4).append(", timeout=").append(z).append(", batchReady=").append(z2).append(", queueWillBlock=").append(z3).toString());
        }
        return z4 && (z || z2 || z3);
    }

    private void writeRecords() {
        try {
            if (isTraceEnabled(5)) {
                trace(new StringBuffer().append("FlowEventConsumerThread: writing batch of ").append(this.eventBatch.size()).append(" records.").toString());
            }
            this.eventWriter.write(this.eventBatch);
        } catch (InterchangeExceptions e) {
            log(Messages.DATABASE_WRITE_FAILED$1, 2, e.getMessage());
        } finally {
            this.queueWaitTimeout = 100L;
            this.eventBatch.clear();
            this.lastTimestamp = getTimestamp();
        }
    }

    public synchronized void shutdown() {
        if (isTraceEnabled(3)) {
            trace("FlowEventConsumerThread: begin graceful shutdown");
        }
        drainBatch();
        this.isActive = false;
        stopThread();
    }

    private void drainBatch() {
        if (isTraceEnabled(3)) {
            trace("FlowEventConsumerThread: draining in-memory records");
        }
        if (this.eventBatch.size() > 0) {
            writeRecords();
        } else if (isTraceEnabled(3)) {
            trace("FlowEventConsumerThread: no events to drain");
        }
    }

    private static long getTimestamp() {
        return ServiceControl.home.getInstance().getCurrentTimestamp();
    }

    public void activate() {
        this.isActive = true;
    }

    public void deactivate() {
        this.isActive = false;
    }

    public void stopThread() {
        this.isRunning = false;
    }
}
