package org.marketcetera.module;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.marketcetera.module.ModuleTestBase;
import org.marketcetera.util.log.SLF4JLoggerProxy;
import org.marketcetera.util.misc.ClassVersion;

@ClassVersion("$Id: DataFlowTest.java 17411 2017-04-28 14:50:38Z colin $")
/* loaded from: input_file:org/marketcetera/module/DataFlowTest.class */
public class DataFlowTest extends ModuleTestBase {
    private static ModuleManager sManager;
    private static final int NUM_TIMES = 6;
    private static ModuleTestBase.Sink sSink = new ModuleTestBase.Sink();

    @BeforeClass
    public static void setup() throws Exception {
        try {
            sManager = new ModuleManager();
            sManager.init();
            sManager.addSinkListener(sSink);
        } catch (Exception e) {
            SLF4JLoggerProxy.error(e, e);
            throw e;
        }
    }

    @AfterClass
    public static void cleanup() throws Exception {
        sManager.stop();
    }

    @After
    public void clearUp() throws Exception {
        sSink.clear();
        ((EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN)).clear();
        sManager.setMaxFlowHistory(0);
        sManager.setMaxFlowHistory(10);
        Assert.assertEquals(10L, sManager.getMaxFlowHistory());
    }

    @Test
    public void getDataFlowInvalidID() throws Exception {
        final DataFlowID dataFlowID = new DataFlowID("blah");
        new ExpectedFailure<DataFlowNotFoundException>(Messages.DATA_FLOW_NOT_FOUND, new Object[]{dataFlowID.getValue()}) { // from class: org.marketcetera.module.DataFlowTest.1
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.getDataFlowInfo(dataFlowID);
            }
        };
    }

    @Test
    public void cancelDataFlowInvalidID() throws Exception {
        final DataFlowID dataFlowID = new DataFlowID("blah");
        new ExpectedFailure<DataFlowNotFoundException>(Messages.DATA_FLOW_NOT_FOUND, new Object[]{dataFlowID.getValue()}) { // from class: org.marketcetera.module.DataFlowTest.2
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.cancel(dataFlowID);
            }
        };
    }

    @Test
    public void createFlowManager() throws Exception {
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        startEmitter();
        ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "proc");
        List moduleInstances = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertFalse(moduleInstances.toString(), moduleInstances.contains(moduleURN));
        checkDataFlowManager(true, false, true, new DataRequest(moduleURN, String.class.getName()));
        checkDataFlowManager(false, true, true, new DataRequest(moduleURN, String.class.getName()));
        checkDataFlowManager(false, true, true, new DataRequest(moduleURN, String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN));
        sManager.createModule(moduleURN.parent(), new Object[]{moduleURN});
        checkDataFlowManager(false, true, false, new DataRequest(new ModuleURN("metc:::proc"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent()));
        checkDataFlowManager(false, true, false, new DataRequest(new ModuleURN("metc:::proc"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent().parent()));
    }

    @Test(timeout = 10000)
    public void emitFailStoppedFlow() throws Exception {
        Assert.assertTrue(sManager.getDataFlowHistory().isEmpty());
        startEmitter();
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "send this data")});
        while (sSink.getData().length < 1) {
            Thread.sleep(500L);
        }
        sManager.cancel(createDataFlow);
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        DataEmitterSupport support = emitterModule.getLastTask().getSupport();
        Assert.assertNotNull(support);
        int length = sSink.getData().length;
        support.send(new Object());
        Assert.assertEquals(length, sSink.getData().length);
        support.dataEmitError(TestMessages.BAD_DATA, true);
        emitterModule.clear();
    }

    @Test
    public void createFlowModule() throws Exception {
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        startEmitter();
        ModuleURN moduleURN = new ModuleURN(FlowRequesterModuleFactory.PROVIDER_URN, EmitterModuleFactory.INSTANCE_URN.instanceName());
        sManager.createModule(FlowRequesterModuleFactory.PROVIDER_URN, new Object[]{moduleURN});
        ModuleTestBase.assertModuleInfo(sManager.getModuleInfo(moduleURN), moduleURN, ModuleState.CREATED, (DataFlowID[]) null, (DataFlowID[]) null, false, false, true, true, true);
        final FlowRequesterModule flowRequesterModule = (FlowRequesterModule) ModuleBase.getInstance(moduleURN);
        Assert.assertNotNull(flowRequesterModule);
        checkDataFlowModule(flowRequesterModule, null, true, false, new DataRequest(moduleURN, String.class.getName()));
        checkDataFlowModule(flowRequesterModule, null, true, false, DataCoupling.ASYNC, new DataRequest(moduleURN, DataCoupling.ASYNC, String.class.getName()));
        checkDataFlowModule(flowRequesterModule, null, false, true, new DataRequest(moduleURN, String.class.getName()));
        checkDataFlowModule(flowRequesterModule, null, false, true, new DataRequest(moduleURN, String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN));
        checkDataFlowModule(flowRequesterModule, null, false, true, new DataRequest(new ModuleURN("metc:this:this:this"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent()));
        checkDataFlowModule(flowRequesterModule, null, false, true, new DataRequest(new ModuleURN("metc:flow::default"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent().parent()));
        checkDataFlowModule(flowRequesterModule, new ModuleURN("metc:emit:this:this"), false, true, new DataRequest(new ModuleURN("metc:flow::default"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent().parent()));
        flowRequesterModule.setSkipCancel(true);
        checkDataFlowModule(flowRequesterModule, new ModuleURN("metc:emit:this:this"), false, true, new DataRequest(new ModuleURN("metc:flow::default"), String.class.getName()), new DataRequest(SinkModuleFactory.INSTANCE_URN.parent().parent()));
        flowRequesterModule.setInvokeDefault(true);
        flowRequesterModule.setRequests(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(flowRequesterModule.getURN(), String.class.getName())});
        new ExpectedFailure<ModuleStateException>(Messages.DATAFLOW_FAILED_REQ_MODULE_STATE_INCORRECT, new Object[]{moduleURN.toString(), ModuleState.STOPPED, ModuleState.REQUEST_FLOW_STATES.toString()}) { // from class: org.marketcetera.module.DataFlowTest.3
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                flowRequesterModule.createFlow();
            }
        };
        DataFlowID dataFlowID = new DataFlowID("doesntmatter");
        flowRequesterModule.setFlowID(dataFlowID);
        new ExpectedFailure<ModuleStateException>(Messages.CANCEL_FAILED_MODULE_STATE_INCORRECT, new Object[]{dataFlowID.getValue(), flowRequesterModule.getURN().toString(), ModuleState.STOPPED, ModuleState.CANCEL_FLOW_STATES.toString()}) { // from class: org.marketcetera.module.DataFlowTest.4
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                flowRequesterModule.cancelFlow();
            }
        };
    }

    @Test
    public void checkNestedFlowRequestFailures() throws Exception {
        ModuleURN moduleURN = new ModuleURN(FlowRequesterModuleFactory.PROVIDER_URN, "flow");
        sManager.createModule(FlowRequesterModuleFactory.PROVIDER_URN, new Object[]{moduleURN});
        ModuleTestBase.assertModuleInfo(sManager.getModuleInfo(moduleURN), moduleURN, ModuleState.CREATED, (DataFlowID[]) null, (DataFlowID[]) null, false, false, true, true, true);
        FlowRequesterModule flowRequesterModule = (FlowRequesterModule) ModuleBase.getInstance(moduleURN);
        Assert.assertNotNull(flowRequesterModule);
        sManager.start(moduleURN);
        flowRequesterModule.setNestDataFlowInRequest(true);
        flowRequesterModule.setNestedCreateDataFlow(true);
        flowRequesterModule.setInvokeDefault(true);
        runNestedFlowRequestFailureInRequestData(moduleURN);
        flowRequesterModule.setInvokeDefault(false);
        runNestedFlowRequestFailureInRequestData(moduleURN);
        flowRequesterModule.setNestedCreateDataFlow(false);
        flowRequesterModule.setNestedCancelDataFlow(true);
        runNestedFlowRequestFailureInRequestData(moduleURN);
        DataFlowID createFlowForNestedFlowTesting = createFlowForNestedFlowTesting(flowRequesterModule);
        flowRequesterModule.setNestDataFlowInCancel(true);
        flowRequesterModule.setNestedCancelDataFlow(false);
        flowRequesterModule.setNestedCreateDataFlow(true);
        flowRequesterModule.setInvokeDefault(true);
        runNestedFlowRequestFailureInCancel(flowRequesterModule, createFlowForNestedFlowTesting);
        DataFlowID createFlowForNestedFlowTesting2 = createFlowForNestedFlowTesting(flowRequesterModule);
        flowRequesterModule.setNestDataFlowInCancel(true);
        flowRequesterModule.setInvokeDefault(false);
        runNestedFlowRequestFailureInCancel(flowRequesterModule, createFlowForNestedFlowTesting2);
        DataFlowID createFlowForNestedFlowTesting3 = createFlowForNestedFlowTesting(flowRequesterModule);
        flowRequesterModule.setNestDataFlowInCancel(true);
        flowRequesterModule.setNestedCreateDataFlow(false);
        flowRequesterModule.setNestedCancelDataFlow(true);
        runNestedFlowRequestFailureInCancel(flowRequesterModule, createFlowForNestedFlowTesting3);
    }

    private void runNestedFlowRequestFailureInCancel(FlowRequesterModule flowRequesterModule, DataFlowID dataFlowID) throws ModuleException {
        flowRequesterModule.resetNestedCancelFailure();
        Assert.assertNull(flowRequesterModule.getNestedCancelFailure());
        sManager.cancel(dataFlowID);
        Assert.assertNotNull(flowRequesterModule.getNestedCancelFailure());
        Assert.assertEquals(ModuleException.class, ExpectedFailure.assertI18NException(flowRequesterModule.getNestedCancelFailure(), Messages.INCORRECT_NESTED_FLOW_REQUEST, new Object[0]).getClass());
    }

    private DataFlowID createFlowForNestedFlowTesting(FlowRequesterModule flowRequesterModule) throws ModuleException {
        flowRequesterModule.setNestDataFlowInRequest(false);
        flowRequesterModule.setNestDataFlowInCancel(false);
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(flowRequesterModule.getURN(), String.class.getName())});
        sManager.getDataFlowInfo(createDataFlow);
        return createDataFlow;
    }

    private void runNestedFlowRequestFailureInRequestData(final ModuleURN moduleURN) throws Exception {
        Assert.assertEquals(ModuleException.class, ExpectedFailure.assertI18NException(new ExpectedFailure<RequestDataException>() { // from class: org.marketcetera.module.DataFlowTest.5
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN)});
            }
        }.getException().getCause(), Messages.INCORRECT_NESTED_FLOW_REQUEST, new Object[0]).getClass());
    }

    @Test
    @Ignore
    public void createFlowSystemFailures() throws Exception {
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        new ExpectedFailure<DataFlowException>(Messages.DATA_REQUEST_TOO_SHORT, 0) { // from class: org.marketcetera.module.DataFlowTest.6
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow((DataRequest[]) null);
            }
        };
        new ExpectedFailure<DataFlowException>(Messages.DATA_REQUEST_TOO_SHORT, 0) { // from class: org.marketcetera.module.DataFlowTest.7
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[0]);
            }
        };
        new ExpectedFailure<DataFlowException>(Messages.DATA_REQUEST_TOO_SHORT, 1) { // from class: org.marketcetera.module.DataFlowTest.8
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(new ModuleURN("metc:dontmatter"))}, false);
            }
        };
        final ModuleURN moduleURN = new ModuleURN("invalidURN");
        new ExpectedFailure<InvalidURNException>(Messages.INVALID_URN_SCHEME, new Object[]{moduleURN.scheme(), moduleURN.toString(), "metc"}) { // from class: org.marketcetera.module.DataFlowTest.9
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        };
        new ExpectedFailure<InvalidURNException>(Messages.INVALID_URN_SCHEME, new Object[]{moduleURN.scheme(), moduleURN.toString(), "metc"}) { // from class: org.marketcetera.module.DataFlowTest.10
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(moduleURN)});
            }
        };
        final ModuleURN moduleURN2 = new ModuleURN("metc:not:exist");
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN2.toString()}) { // from class: org.marketcetera.module.DataFlowTest.11
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN2), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        };
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN2.toString()}) { // from class: org.marketcetera.module.DataFlowTest.12
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(moduleURN2)});
            }
        };
        final ModuleURN moduleURN3 = new ModuleURN(ComplexModuleFactory.PROVIDER_URN, "notexist");
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN3.toString()}) { // from class: org.marketcetera.module.DataFlowTest.13
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN3), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        };
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN3.toString()}) { // from class: org.marketcetera.module.DataFlowTest.14
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(moduleURN3)});
            }
        };
        final ModuleURN moduleURN4 = new ModuleURN("metc:not:exist:no");
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN4.toString()}) { // from class: org.marketcetera.module.DataFlowTest.15
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN4), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        }.getException().printStackTrace();
        new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{moduleURN4.toString()}) { // from class: org.marketcetera.module.DataFlowTest.16
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(moduleURN4)});
            }
        };
        new ExpectedFailure<DataFlowException>(Messages.MODULE_NOT_EMITTER, SingleModuleFactory.INSTANCE_URN.toString()) { // from class: org.marketcetera.module.DataFlowTest.17
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(EmitterModuleFactory.INSTANCE_URN)});
            }
        };
        startEmitter();
        new ExpectedFailure<DataFlowException>(Messages.MODULE_NOT_EMITTER, SingleModuleFactory.INSTANCE_URN.toString()) { // from class: org.marketcetera.module.DataFlowTest.18
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(SingleModuleFactory.INSTANCE_URN), new DataRequest(SinkModuleFactory.INSTANCE_URN)}, false);
            }
        };
        new ExpectedFailure<DataFlowException>(Messages.MODULE_NOT_RECEIVER, SingleModuleFactory.INSTANCE_URN.toString()) { // from class: org.marketcetera.module.DataFlowTest.19
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        };
        final ModuleURN moduleURN5 = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "myreceiver");
        sManager.createModule(ProcessorModuleFactory.PROVIDER_URN, new Object[]{moduleURN5});
        new ExpectedFailure<DataFlowException>(Messages.MODULE_NOT_RECEIVER, new Object[]{SingleModuleFactory.INSTANCE_URN.toString()}) { // from class: org.marketcetera.module.DataFlowTest.20
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN5), new DataRequest(SingleModuleFactory.INSTANCE_URN)});
            }
        };
        ModuleURN moduleURN6 = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "mysecondreceiver");
        sManager.createModule(ProcessorModuleFactory.PROVIDER_URN, new Object[]{moduleURN6});
        final ModuleURN parent = moduleURN6.parent();
        Assert.assertEquals(new HashSet(Arrays.asList(parent.getValue(), moduleURN5.getValue(), moduleURN6.getValue())), new HashSet(Arrays.asList(new ExpectedFailure<ModuleNotFoundException>(Messages.MULTIPLE_MODULES_MATCH_URN, new Object[0]) { // from class: org.marketcetera.module.DataFlowTest.21
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(parent)});
            }
        }.getException().getI18NBoundMessage().getParams())));
        final ModuleURN moduleURN7 = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "this");
        new ExpectedFailure<InvalidURNException>(Messages.INVALID_INSTANCE_URN, new Object[]{moduleURN7.toString(), moduleURN7.instanceName()}) { // from class: org.marketcetera.module.DataFlowTest.22
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN7)});
            }
        };
        sManager.stop(moduleURN5);
        new ExpectedFailure<ModuleStateException>(Messages.DATAFLOW_FAILED_PCPT_MODULE_STATE_INCORRECT, new Object[]{moduleURN5.toString(), ModuleState.STOPPED, ModuleState.PARTICIPATE_FLOW_STATES.toString()}) { // from class: org.marketcetera.module.DataFlowTest.23
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN5)}, false);
            }
        };
        new ExpectedFailure<ModuleStateException>(Messages.DATAFLOW_FAILED_PCPT_MODULE_STATE_INCORRECT, new Object[]{moduleURN5.toString(), ModuleState.STOPPED, ModuleState.PARTICIPATE_FLOW_STATES.toString()}) { // from class: org.marketcetera.module.DataFlowTest.24
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN5)});
            }
        };
        sManager.stop(EmitterModuleFactory.INSTANCE_URN);
        new ExpectedFailure<ModuleStateException>(Messages.DATAFLOW_FAILED_PCPT_MODULE_STATE_INCORRECT, new Object[]{EmitterModuleFactory.INSTANCE_URN.toString(), ModuleState.STOPPED, ModuleState.PARTICIPATE_FLOW_STATES.toString()}) { // from class: org.marketcetera.module.DataFlowTest.25
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN5)});
            }
        };
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
    }

    @Test
    public void moduleAutoDelete() throws Exception {
        startEmitter();
        final ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "autod");
        List moduleInstances = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertFalse(moduleInstances.toString(), moduleInstances.contains(moduleURN));
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "no"), new DataRequest(moduleURN, String.class.getName())});
        List moduleInstances2 = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertTrue(moduleInstances2.toString(), moduleInstances2.contains(moduleURN));
        assertFlowInfo(sManager.getDataFlowInfo(createDataFlow), createDataFlow, 3, true, false, null, null);
        assertModuleInfo(sManager, moduleURN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow}, true, true, true, true, false);
        DataFlowID createDataFlow2 = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "so"), new DataRequest(moduleURN, String.class.getName())});
        assertFlowInfo(sManager.getDataFlowInfo(createDataFlow2), createDataFlow2, 3, true, false, null, null);
        assertModuleInfo(sManager, moduleURN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow, createDataFlow2}, true, true, true, true, false);
        sManager.cancel(createDataFlow);
        assertModuleInfo(sManager, moduleURN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow2}, true, true, true, true, false);
        sManager.cancel(createDataFlow2);
        List moduleInstances3 = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertFalse(moduleInstances3.toString(), moduleInstances3.contains(moduleURN));
        new ExpectedFailure<IllegalRequestParameterValue>() { // from class: org.marketcetera.module.DataFlowTest.26
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, (Object) null), new DataRequest(moduleURN, String.class.getName())});
            }
        };
        List moduleInstances4 = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertFalse(moduleInstances4.toString(), moduleInstances4.contains(moduleURN));
    }

    @Test
    public void createFlowModuleFailures() throws Exception {
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        startEmitter();
        final ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "failures");
        List moduleInstances = sManager.getModuleInstances(ProcessorModuleFactory.PROVIDER_URN);
        Assert.assertFalse(moduleInstances.toString(), moduleInstances.contains(moduleURN));
        new ExpectedFailure<IllegalRequestParameterValue>(Messages.ILLEGAL_REQ_PARM_VALUE, new Object[]{EmitterModuleFactory.INSTANCE_URN.getValue(), null}) { // from class: org.marketcetera.module.DataFlowTest.27
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN), new DataRequest(moduleURN, String.class.getName())});
            }
        };
        Assert.assertEquals(0L, ((ProcessorModule) ModuleBase.getInstance(moduleURN)).getNumRequests());
        new ExpectedFailure<UnsupportedRequestParameterType>(Messages.UNSUPPORTED_REQ_PARM_TYPE, new Object[]{EmitterModuleFactory.INSTANCE_URN.getValue(), Boolean.class.getName()}) { // from class: org.marketcetera.module.DataFlowTest.28
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, true), new DataRequest(moduleURN, String.class.getName())});
            }
        };
        new ExpectedFailure<IllegalRequestParameterValue>(Messages.ILLEGAL_REQ_PARM_VALUE, new Object[]{moduleURN.getValue(), null}) { // from class: org.marketcetera.module.DataFlowTest.29
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "send data"), new DataRequest(moduleURN)});
            }
        };
        new ExpectedFailure<UnsupportedRequestParameterType>(Messages.UNSUPPORTED_REQ_PARM_TYPE, new Object[]{moduleURN.getValue(), Boolean.class.getName()}) { // from class: org.marketcetera.module.DataFlowTest.30
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "send data"), new DataRequest(moduleURN, true)});
            }
        };
        new ExpectedFailure<IllegalRequestParameterValue>(Messages.ILLEGAL_REQ_PARM_VALUE, new Object[]{moduleURN.getValue(), "blah"}) { // from class: org.marketcetera.module.DataFlowTest.31
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "send data"), new DataRequest(moduleURN, "blah")});
            }
        };
    }

    @Test
    public void preStartFlowsCleanup() throws Exception {
        startEmitter();
        final ModuleURN moduleURN = new ModuleURN(FlowRequesterModuleFactory.PROVIDER_URN, "prestart");
        sManager.createModule(FlowRequesterModuleFactory.PROVIDER_URN, new Object[]{moduleURN});
        DataRequest[] dataRequestArr = {new DataRequest(EmitterModuleFactory.INSTANCE_URN, "somestring"), new DataRequest(moduleURN, String.class.getName())};
        final FlowRequesterModule flowRequesterModule = (FlowRequesterModule) ModuleBase.getInstance(moduleURN);
        Assert.assertNotNull(flowRequesterModule);
        flowRequesterModule.setRequests(dataRequestArr);
        flowRequesterModule.setInvokeDefault(true);
        flowRequesterModule.setFailPreStart(true);
        new ExpectedFailure<ModuleException>(TestMessages.TEST_START_STOP_FAILURE) { // from class: org.marketcetera.module.DataFlowTest.32
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.start(moduleURN);
            }
        };
        assertModuleInfo(sManager, moduleURN, ModuleState.START_FAILED, (DataFlowID[]) null, (DataFlowID[]) null, false, false, true, true, true);
        Assert.assertNotNull(flowRequesterModule.getFlowID());
        new ExpectedFailure<DataFlowNotFoundException>(Messages.DATA_FLOW_NOT_FOUND, new Object[]{flowRequesterModule.getFlowID().getValue()}) { // from class: org.marketcetera.module.DataFlowTest.33
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.getDataFlowInfo(flowRequesterModule.getFlowID());
            }
        };
        assertFlowInfo((DataFlowInfo) sManager.getDataFlowHistory().get(0), flowRequesterModule.getFlowID(), 3, true, true, moduleURN, null);
    }

    @Test(timeout = 60000)
    public void dataFlowStopEmitter() throws Exception {
        Assert.assertTrue(sManager.getDataFlowHistory().isEmpty());
        HashMap hashMap = new HashMap();
        hashMap.put("value", "my data");
        hashMap.put("error", TestMessages.EMIT_DATA_ERROR);
        hashMap.put("times", Integer.valueOf(NUM_TIMES));
        hashMap.put("requestStop", Boolean.TRUE);
        startEmitter();
        ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "emitFail");
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, hashMap), new DataRequest(moduleURN, String.class.getName())});
        sSink.waitUntilTerminator();
        List dataFlows = sManager.getDataFlows(true);
        Assert.assertEquals(1L, dataFlows.size());
        Assert.assertEquals(createDataFlow, dataFlows.get(0));
        EmitterModule.readyToProceed();
        while (!sManager.getDataFlows(true).isEmpty()) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals(1L, sManager.getDataFlowHistory().size());
        assertFlowInfo((DataFlowInfo) sManager.getDataFlowHistory().get(0), createDataFlow, 3, true, true, null, EmitterModuleFactory.INSTANCE_URN);
        verifyFlowSteps(hashMap, EmitterModuleFactory.INSTANCE_URN, moduleURN, (DataFlowInfo) sManager.getDataFlowHistory().get(0), true, false);
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        Set<RequestID> requests = emitterModule.getRequests();
        Assert.assertEquals(1L, requests.size());
        Assert.assertTrue(emitterModule.getTask(requests.iterator().next()).isCancelled());
        emitterModule.clear();
    }

    @Test(timeout = 60000)
    public void dataFlowStopReceiver() throws Exception {
        Assert.assertTrue(sManager.getDataFlowHistory().isEmpty());
        HashMap hashMap = new HashMap();
        hashMap.put("value", "my data");
        hashMap.put("error", TestMessages.EMIT_DATA_ERROR);
        hashMap.put("times", Integer.valueOf(NUM_TIMES));
        hashMap.put("emitNull", null);
        startEmitter();
        ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "receiveFail");
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, hashMap), new DataRequest(moduleURN, String.class.getName())});
        List dataFlows = sManager.getDataFlows(true);
        Assert.assertEquals(1L, dataFlows.size());
        Assert.assertEquals(createDataFlow, dataFlows.get(0));
        sSink.waitUntilTerminator();
        EmitterModule.readyToProceed();
        while (!sManager.getDataFlows(true).isEmpty()) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals(1L, sManager.getDataFlowHistory().size());
        assertFlowInfo((DataFlowInfo) sManager.getDataFlowHistory().get(0), createDataFlow, 3, true, true, null, moduleURN);
        verifyFlowSteps(hashMap, EmitterModuleFactory.INSTANCE_URN, moduleURN, (DataFlowInfo) sManager.getDataFlowHistory().get(0), false, true);
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        Set<RequestID> requests = emitterModule.getRequests();
        Assert.assertEquals(1L, requests.size());
        Assert.assertTrue(emitterModule.getTask(requests.iterator().next()).isCancelled());
        emitterModule.clear();
    }

    @Test
    public void requestCancelExceptionIgnored() throws Exception {
        ModuleURN moduleURN = new ModuleURN(ProcessorModuleFactory.PROVIDER_URN, "cancelFail");
        startEmitter();
        DataFlowID createDataFlow = sManager.createDataFlow(new DataRequest[]{new DataRequest(EmitterModuleFactory.INSTANCE_URN, "something"), new DataRequest(moduleURN, String.class.getName())});
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        Assert.assertEquals(1L, emitterModule.getRequests().size());
        ProcessorModule processorModule = (ProcessorModule) ModuleBase.getInstance(moduleURN);
        Assert.assertEquals(1L, processorModule.getNumRequests());
        emitterModule.setThrowExceptionOnCancel(true);
        assertFlowInfo(sManager.getDataFlowInfo(createDataFlow), createDataFlow, 3, true, false, null, null);
        sManager.cancel(createDataFlow);
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        Assert.assertEquals(0L, processorModule.getNumRequests());
        Assert.assertEquals(1L, emitterModule.getRequests().size());
        Assert.assertTrue(emitterModule.getTask(emitterModule.getRequests().iterator().next()).isCancelled());
    }

    private static void startEmitter() throws ModuleException {
        if (sManager.getModuleInfo(EmitterModuleFactory.INSTANCE_URN).getState().isStarted()) {
            return;
        }
        sManager.start(EmitterModuleFactory.INSTANCE_URN);
    }

    private void checkDataFlowManager(boolean z, boolean z2, boolean z3, DataRequest... dataRequestArr) throws Exception {
        int size = sManager.getDataFlowHistory().size();
        HashMap hashMap = new HashMap();
        hashMap.put("value", "my data");
        hashMap.put("error", TestMessages.EMIT_DATA_ERROR);
        hashMap.put("times", Integer.valueOf(NUM_TIMES));
        DataRequest[] dataRequestArr2 = new DataRequest[dataRequestArr.length + 1];
        dataRequestArr2[0] = new DataRequest(EmitterModuleFactory.INSTANCE_URN, hashMap);
        System.arraycopy(dataRequestArr, 0, dataRequestArr2, 1, dataRequestArr.length);
        ModuleURN requestURN = dataRequestArr2[1].getRequestURN();
        DataFlowID createDataFlow = z ? sManager.createDataFlow(dataRequestArr2) : sManager.createDataFlow(dataRequestArr2, z2);
        Assert.assertNotNull(createDataFlow);
        List dataFlows = sManager.getDataFlows(true);
        Assert.assertEquals(1L, dataFlows.size());
        Assert.assertEquals(createDataFlow, dataFlows.get(0));
        Assert.assertEquals(dataFlows, sManager.getDataFlows(false));
        HashSet hashSet = new HashSet();
        hashSet.add(createDataFlow);
        new ExpectedFailure<DataFlowException>(Messages.CANNOT_STOP_MODULE_DATAFLOWS, EmitterModuleFactory.INSTANCE_URN.toString(), hashSet.toString()) { // from class: org.marketcetera.module.DataFlowTest.34
            @Override // org.marketcetera.module.ExpectedFailure
            protected void run() throws Exception {
                DataFlowTest.sManager.stop(EmitterModuleFactory.INSTANCE_URN);
            }
        };
        sSink.waitUntilTerminator();
        DataFlowInfo dataFlowInfo = sManager.getDataFlowInfo(createDataFlow);
        assertFlowInfo(dataFlowInfo, createDataFlow, 3, true, false, null, null);
        final ModuleURN verifyFlowSteps = verifyFlowSteps(hashMap, EmitterModuleFactory.INSTANCE_URN, requestURN, dataFlowInfo, false, false);
        assertModuleInfo(sManager.getModuleInfo(EmitterModuleFactory.INSTANCE_URN), EmitterModuleFactory.INSTANCE_URN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow}, false, false, false, true, false);
        assertModuleInfo(sManager.getModuleInfo(verifyFlowSteps), verifyFlowSteps, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow}, z3, true, true, true, false);
        assertModuleInfo(sManager.getModuleInfo(SinkModuleFactory.INSTANCE_URN), SinkModuleFactory.INSTANCE_URN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{createDataFlow}, false, true, true, false, false);
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        Assert.assertTrue(emitterModule.getFlows().contains(createDataFlow));
        Assert.assertEquals(1L, emitterModule.getFlows().size());
        ProcessorModule processorModule = (ProcessorModule) ModuleBase.getInstance(verifyFlowSteps);
        Assert.assertEquals(1L, processorModule.getNumRequests());
        Assert.assertTrue(processorModule.isStartInvoked());
        Assert.assertEquals(1L, processorModule.getFlows().length);
        Assert.assertEquals(createDataFlow, processorModule.getFlows()[0]);
        sManager.cancel(createDataFlow);
        ModuleTestBase.FlowData[] data = sSink.getData();
        Assert.assertEquals(3L, data.length);
        for (ModuleTestBase.FlowData flowData : data) {
            Assert.assertEquals(createDataFlow, flowData.getFirstMember());
            Assert.assertEquals("my data", flowData.getSecondMember());
        }
        sSink.clear();
        Set<RequestID> requests = emitterModule.getRequests();
        Assert.assertEquals(1L, requests.size());
        Assert.assertTrue(emitterModule.getTask(requests.iterator().next()).isCancelled());
        Assert.assertTrue(emitterModule.getFlows().isEmpty());
        emitterModule.clear();
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        ProcessorModule processorModule2 = (ProcessorModule) ModuleBase.getInstance(verifyFlowSteps);
        Assert.assertEquals(0L, processorModule2.getNumRequests());
        Assert.assertEquals(0L, processorModule2.getFlows().length);
        List dataFlowHistory = sManager.getDataFlowHistory();
        Assert.assertEquals(size + 1, dataFlowHistory.size());
        assertFlowInfo((DataFlowInfo) dataFlowHistory.get(0), createDataFlow, 3, true, true, null, null);
        verifyFlowSteps(hashMap, EmitterModuleFactory.INSTANCE_URN, requestURN, (DataFlowInfo) dataFlowHistory.get(0), false, false);
        if (z3) {
            new ExpectedFailure<ModuleNotFoundException>(Messages.MODULE_NOT_FOUND, new Object[]{verifyFlowSteps.toString()}) { // from class: org.marketcetera.module.DataFlowTest.35
                @Override // org.marketcetera.module.ExpectedFailure
                protected void run() throws Exception {
                    DataFlowTest.sManager.getModuleInfo(verifyFlowSteps);
                }
            };
        }
    }

    private void checkDataFlowModule(FlowRequesterModule flowRequesterModule, ModuleURN moduleURN, boolean z, boolean z2, DataCoupling dataCoupling, DataRequest... dataRequestArr) throws Exception {
        int size = sManager.getDataFlowHistory().size();
        HashMap hashMap = new HashMap();
        hashMap.put("value", "my data");
        hashMap.put("error", TestMessages.EMIT_DATA_ERROR);
        hashMap.put("times", Integer.valueOf(NUM_TIMES));
        DataRequest[] dataRequestArr2 = new DataRequest[dataRequestArr.length + 1];
        if (moduleURN == null) {
            moduleURN = EmitterModuleFactory.INSTANCE_URN;
        }
        dataRequestArr2[0] = new DataRequest(moduleURN, dataCoupling, hashMap);
        System.arraycopy(dataRequestArr, 0, dataRequestArr2, 1, dataRequestArr.length);
        ModuleURN requestURN = dataRequestArr2[1].getRequestURN();
        flowRequesterModule.setRequests(dataRequestArr2);
        flowRequesterModule.setInvokeDefault(z);
        flowRequesterModule.setAppendSink(z2);
        sManager.start(flowRequesterModule.getURN());
        DataFlowID flowID = flowRequesterModule.getFlowID();
        Assert.assertNotNull(flowID);
        List dataFlows = sManager.getDataFlows(true);
        Assert.assertEquals(1L, dataFlows.size());
        Assert.assertEquals(flowID, dataFlows.get(0));
        Assert.assertTrue(sManager.getDataFlows(false).isEmpty());
        sSink.waitUntilTerminator();
        DataFlowInfo dataFlowInfo = sManager.getDataFlowInfo(flowID);
        assertFlowInfo(dataFlowInfo, flowID, 3, true, false, flowRequesterModule.getURN(), null);
        ModuleURN verifyFlowSteps = verifyFlowSteps(hashMap, moduleURN, requestURN, dataFlowInfo, false, false, dataCoupling);
        ModuleTestBase.assertModuleInfo(sManager.getModuleInfo(EmitterModuleFactory.INSTANCE_URN), EmitterModuleFactory.INSTANCE_URN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{flowID}, false, false, false, true, false);
        ModuleTestBase.assertModuleInfo(sManager.getModuleInfo(verifyFlowSteps), verifyFlowSteps, ModuleState.STARTED, new DataFlowID[]{flowID}, new DataFlowID[]{flowID}, false, false, true, true, true);
        ModuleTestBase.assertModuleInfo(sManager.getModuleInfo(SinkModuleFactory.INSTANCE_URN), SinkModuleFactory.INSTANCE_URN, ModuleState.STARTED, (DataFlowID[]) null, new DataFlowID[]{flowID}, false, true, true, false, false);
        EmitterModule emitterModule = (EmitterModule) ModuleBase.getInstance(EmitterModuleFactory.INSTANCE_URN);
        Assert.assertTrue(emitterModule.getFlows().contains(flowID));
        Assert.assertEquals(1L, emitterModule.getFlows().size());
        ProcessorModule processorModule = (ProcessorModule) ModuleBase.getInstance(verifyFlowSteps);
        Assert.assertEquals(1L, processorModule.getNumRequests());
        Assert.assertTrue(processorModule.isStartInvoked());
        Assert.assertEquals(1L, processorModule.getFlows().length);
        Assert.assertEquals(flowID, processorModule.getFlows()[0]);
        sManager.stop(flowRequesterModule.getURN());
        ModuleTestBase.FlowData[] data = sSink.getData();
        Assert.assertEquals(3L, data.length);
        for (ModuleTestBase.FlowData flowData : data) {
            Assert.assertEquals(flowID, flowData.getFirstMember());
            Assert.assertEquals("my data", flowData.getSecondMember());
        }
        sSink.clear();
        Set<RequestID> requests = emitterModule.getRequests();
        Assert.assertEquals(1L, requests.size());
        Assert.assertTrue(emitterModule.getTask(requests.iterator().next()).isCancelled());
        Assert.assertTrue(emitterModule.getFlows().isEmpty());
        emitterModule.clear();
        ProcessorModule processorModule2 = (ProcessorModule) ModuleBase.getInstance(verifyFlowSteps);
        Assert.assertEquals(0L, processorModule2.getNumRequests());
        Assert.assertTrue(processorModule2.isStartInvoked());
        Assert.assertTrue(processorModule2.isStopInvoked());
        Assert.assertTrue(sManager.getDataFlows(true).isEmpty());
        List dataFlowHistory = sManager.getDataFlowHistory();
        Assert.assertEquals(size + 1, dataFlowHistory.size());
        assertFlowInfo((DataFlowInfo) dataFlowHistory.get(0), flowID, 3, true, true, flowRequesterModule.getURN(), flowRequesterModule.isSkipCancel() ? null : flowRequesterModule.getURN());
        verifyFlowSteps(hashMap, moduleURN, requestURN, (DataFlowInfo) dataFlowHistory.get(0), false, false, dataCoupling);
    }

    private void checkDataFlowModule(FlowRequesterModule flowRequesterModule, ModuleURN moduleURN, boolean z, boolean z2, DataRequest... dataRequestArr) throws Exception {
        checkDataFlowModule(flowRequesterModule, moduleURN, z, z2, DataCoupling.SYNC, dataRequestArr);
    }

    private ModuleURN verifyFlowSteps(Map<String, Object> map, ModuleURN moduleURN, ModuleURN moduleURN2, DataFlowInfo dataFlowInfo, boolean z, boolean z2, DataCoupling dataCoupling) {
        assertFlowStep(dataFlowInfo.getFlowSteps()[0], EmitterModuleFactory.INSTANCE_URN, true, 7 + (z2 ? 1 : 0), NUM_TIMES + (z ? 1 : 0), z ? TestMessages.STOP_DATA_FLOW.getText() : TestMessages.EMIT_DATA_ERROR.getText(), false, 0, 0, null, moduleURN, dataCoupling, map.toString());
        assertFlowStep(dataFlowInfo.getFlowSteps()[1], null, true, 4, 0, null, true, 7 + (z2 ? 1 : 0), 3 + (z2 ? 1 : 0), z2 ? TestMessages.STOP_DATA_FLOW.getText() : TestMessages.BAD_DATA.getText(), moduleURN2, dataCoupling, String.class.getName());
        ModuleURN moduleURN3 = dataFlowInfo.getFlowSteps()[1].getModuleURN();
        assertFlowStep(dataFlowInfo.getFlowSteps()[2], SinkModuleFactory.INSTANCE_URN, false, 0, 0, null, true, 4, 0, null, null, DataCoupling.SYNC, null);
        return moduleURN3;
    }

    private ModuleURN verifyFlowSteps(Map<String, Object> map, ModuleURN moduleURN, ModuleURN moduleURN2, DataFlowInfo dataFlowInfo, boolean z, boolean z2) {
        return verifyFlowSteps(map, moduleURN, moduleURN2, dataFlowInfo, z, z2, DataCoupling.SYNC);
    }
}
