Tuesday, November 2, 2010

Enlist In A Send Pipeline Transaction From A Custom Pipeline Component

A recent messaging project I worked on required executing a database update when a send port succeeded but not when it failed. It has been shown that one can leverage a receive pipeline's underlying transaction but not a send pipeline's. Enter IPersistQueryable.

By implementing IPersistQueryable your unit of work, for example a transaction status update, can be added as a transactional custom BAM event. The EventStream property of the pipeline context exposes the StoreCustomEvent method that takes your IPersistQueryable implementation.

Essentially, this method of transaction enlistment assures your application's 'intention' to update an integration status, for example, when the send port succeeds. If the send port fails your IPersistQueryable instance will be rolled back out of the TDDS queue.

This solution may not suffice depending on your requirements but it means you can execute a unit of work (out-of-process too) only when the send port succeeds. Plus, in the case of the work unit failing you have a record of the exception in the BAMPrimaryImport.dbo.TDDS_FailedTrackingData table.

Should your IPersistQueryable implementation fault when executed by the TDDS a retry will be attempted. I think there are 3 retries. Either way, the piece of work being performed should be fault tolerant because you cannot enlist in a further transaction within your IPersistQueryable implementation. That is, the unit of work must be able to be executed multiple times resulting in the same final state.

Here's an example of a pipeline component loading the TDDS event and the event itself.

[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[
ComponentCategory(CategoryTypes.CATID_Any)]
[
Guid("D371F2ED-BFA7-47BF-9905-492AE20DAB7F")]
public class StatusUpdater : IComponent, IComponentUI, IBaseComponent
{
    #region IComponent Members

    
IBaseMessage IComponent.Execute(IPipelineContext context, IBaseMessage message)
    {
        
string messageId = (string)message.Context.Read("InterchangeID", BT_SYS_PROPS_NS);
        IPersistQueryable updateIntention = new MarkMessageAsSent(messageId);
        
EventStream tdds = context.GetEventStream();
        tdds.StoreCustomEvent(updateIntention);
        
return message;
    }

    #endregion

    ...

}

[Serializable]
public class MarkMessageAsSent: IPersistQueryable
{
    
private readonly string _messageId;

    
public MarkMessageAsSent(string messageId)
    {
        _messageId = messageId;
    }

    
void IPersistQueryable.AddToBatch(SqlConnection connection, IBatch b)
    {
    }

    
Type IPersistQueryable.BatchType
    {
        
get { return typeof(MarkMessageAsSent); }
    }

    [
NonSerialized]
    
private BAMEventsRecord _parentRecord;
    
BAMEventsRecord IPersistQueryable.ParentRecord
    {
        
get { return _parentRecord; }
        
set { _parentRecord = value; }
    }

    
void IPersistQueryable.PersistQueryable(SqlConnection conn, SqlTransaction xact, int timeout)
    {
        
using (SqlConnection nonDtaConnection = new SqlConnection(NON_DTA_CONNECTION_STRING))
        {
            nonDtaConnection.Open();
            
using (SqlCommand nonDtaCommand = nonDtaConnection.CreateCommand())
            {
                nonDtaCommand.CommandText =
"[dbo].[MarkMessageAsSent]";
                nonDtaCommand.Parameters.AddWithValue(
"@messageId", _messageId);
                nonDtaCommand.CommandType =
CommandType.StoredProcedure;
                nonDtaCommand.ExecuteNonQuery();
            }
        }
    }
}

No comments:

Post a Comment