com.continuent.tungsten.replicator.thl
Class THLParallelReadQueue

java.lang.Object
  extended by com.continuent.tungsten.replicator.thl.THLParallelReadQueue

public class THLParallelReadQueue
extends java.lang.Object

Implements a queue that returns in total order the most recently read events from its parent read task and pending control events required to process watches properly. Items in this queue come from the following sources:

  1. Events read from the log. The THLParallelReadTask thread supplies these as it reads them.
  2. Out-of-band control events. The THLParallelQueue inserts these to implement synchronization between channels, i.e., stage tasks. They must be merged in sequence number order into the queue so that the stage task sees them.
  3. Control events generated from watch predicates. These ensure that stage tasks commit their current position so that watches can be fulfilled.
The main responsibility of this class is to merge these events so the get() method returns them in total order. The APIs of this class match standard queue interfaces.

Access to this class is highly concurrent. Methods that merge queue contents are synchronized to ensure atomicity of changes across queue structures. Failure to do this could result in ordering violations. The event queue is protected by a concurrent collection. Methods that access this as well as statistical information are not synchronized, as doing so would raise the risk of deadlocks.

Version:
1.0
Author:
Robert Hodges

Constructor Summary
THLParallelReadQueue(int taskId, int maxSize, int maxControlEvents, long startingSeqno, int syncInterval, ReplDBMSHeader lastHeader, AtomicIntervalGuard<?> intervalGuard)
          Instantiates a new read queue.
 
Method Summary
 void addWatchSyncPredicate(WatchPredicate<ReplDBMSHeader> predicate)
          Add a new predicate to the list of predicates that should generate sync events.
 long getAcceptCount()
          Return count of accepted events.
 long getDiscardCount()
          Return count of discarded events.
 long getReadSeqno()
          Returns current sequence number we have read.
 boolean isLastFrag()
          Returns whether last event read was the end of a transaction.
 ReplEvent peek()
          Returns but does not remove next event from the queue if it exists or returns null if queue is empty.
 void post(THLEvent thlEvent)
          Post a normal event which will be enqueued immediately.
 void postOutOfBand(ReplControlEvent controlEvent)
          Post a control event, which will either be immediately added to the event queue or buffered in the control event queue until it is time to merge it.
 void release()
          Frees resources including all queues and lists.
 int size()
          Returns the current queue size.
 ReplEvent take()
          Removes the next event from the queue, waiting indefinitely for something to arrive.
 ReplEvent take(long timeout, java.util.concurrent.TimeUnit unit)
          Removes the next element from the event queue, returning null if it cannot be found within the timeout.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

THLParallelReadQueue

public THLParallelReadQueue(int taskId,
                            int maxSize,
                            int maxControlEvents,
                            long startingSeqno,
                            int syncInterval,
                            ReplDBMSHeader lastHeader,
                            AtomicIntervalGuard<?> intervalGuard)
Instantiates a new read queue.

Parameters:
taskId - Task to which this queue belongs
maxSize - Maximum number of all events to buffer
maxControlEvents - Maximum number of control events to buffer
startingSeqno - Sequence number of next transaction
syncInterval - Interval at which to generate synchronization events
lastHeader - Header of last transaction processed before start
intervalGuard - Interval guard to track read position
Method Detail

getReadSeqno

public long getReadSeqno()
Returns current sequence number we have read.


isLastFrag

public boolean isLastFrag()
Returns whether last event read was the end of a transaction.


getAcceptCount

public long getAcceptCount()
Return count of accepted events.


getDiscardCount

public long getDiscardCount()
Return count of discarded events.


release

public void release()
Frees resources including all queues and lists.


addWatchSyncPredicate

public void addWatchSyncPredicate(WatchPredicate<ReplDBMSHeader> predicate)
                           throws java.lang.InterruptedException
Add a new predicate to the list of predicates that should generate sync events. If we are at the end of a transaction, see if the event should be posted immediately.

Parameters:
predicate - Watch predicate
Throws:
java.lang.InterruptedException

postOutOfBand

public void postOutOfBand(ReplControlEvent controlEvent)
                   throws java.lang.InterruptedException
Post a control event, which will either be immediately added to the event queue or buffered in the control event queue until it is time to merge it. The control event is ordered by seqno then by order of insertion in cases where there are multiple control events for a single seqno.

Note that if you post a control event whose seqno is prior to the current seqno in the queue, the control event seqno will be altered to use the current seqno. This is required to prevent seqno values from appearing to move backwards, which could provoke bugs in downstream tasks that expect sequence numbers to increase monotonically.

Parameters:
controlEvent - Control event to post or buffer
Throws:
java.lang.InterruptedException

post

public void post(THLEvent thlEvent)
          throws java.lang.InterruptedException
Post a normal event which will be enqueued immediately. Synchronize position to update counters and merge any pending control events or predicates as well.

Parameters:
thlEvent - Event to post.
Throws:
java.lang.InterruptedException

size

public int size()
Returns the current queue size.


take

public ReplEvent take()
               throws java.lang.InterruptedException
Removes the next event from the queue, waiting indefinitely for something to arrive.

Returns:
An event
Throws:
java.lang.InterruptedException - Thrown if thread is interrupted.

take

public ReplEvent take(long timeout,
                      java.util.concurrent.TimeUnit unit)
               throws java.lang.InterruptedException
Removes the next element from the event queue, returning null if it cannot be found within the timeout.

Parameters:
timeout - Interval to wait
unit - Unit of time
Returns:
An event or null if we time out
Throws:
java.lang.InterruptedException - Thrown if thread is interrupted.

peek

public ReplEvent peek()
               throws java.lang.InterruptedException
Returns but does not remove next event from the queue if it exists or returns null if queue is empty.

Throws:
java.lang.InterruptedException