This commit is contained in:
2024-11-30 19:03:49 +08:00
commit 1e6763c160
3806 changed files with 737676 additions and 0 deletions

View File

@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group;
import java.util.ArrayList;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.TesterUtil;
public class TestGroupChannelMemberArrival {
private static int count = 10;
private ManagedChannel[] channels = new ManagedChannel[count];
private TestMbrListener[] listeners = new TestMbrListener[count];
@Before
public void setUp() throws Exception {
for (int i = 0; i < channels.length; i++) {
channels[i] = new GroupChannel();
channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
channels[i].addMembershipListener(listeners[i]);
}
TesterUtil.addRandomDomain(channels);
}
@Test
public void testMemberArrival() throws Exception {
//purpose of this test is to make sure that we have received all the members
//that we can expect before the start method returns
Thread[] threads = new Thread[channels.length];
for (int i=0; i<channels.length; i++ ) {
final Channel channel = channels[i];
Thread t = new Thread() {
@Override
public void run() {
try {
channel.start(Channel.DEFAULT);
}catch ( Exception x ) {
throw new RuntimeException(x);
}
}
};
threads[i] = t;
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
Thread.sleep(5000);
System.out.println(System.currentTimeMillis()
+ " All channels started.");
StringBuilder arrivalLengthErrors = new StringBuilder();
for (int i = listeners.length - 1; i >= 0; i--) {
TestMbrListener listener = listeners[i];
synchronized (listener.members) {
if (channels.length - 1 != listener.members.size()) {
arrivalLengthErrors.append("Checking member arrival length for [");
arrivalLengthErrors.append(listener.name);
arrivalLengthErrors.append("]. Was [");
arrivalLengthErrors.append(listener.members.size());
arrivalLengthErrors.append("] but should have been [");
arrivalLengthErrors.append(channels.length - 1);
arrivalLengthErrors.append("]");
arrivalLengthErrors.append('\n');
}
}
}
// Note if this fails for all listeners check multicast is working with
// org.apache.catalina.tribes.TesterMulticast
Assert.assertTrue(arrivalLengthErrors.toString(), arrivalLengthErrors.length() == 0);
System.out.println(System.currentTimeMillis()
+ " Members arrival counts checked.");
}
@After
public void tearDown() throws Exception {
for (int i = 0; i < channels.length; i++) {
try {
channels[i].stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
}
}
public static class TestMbrListener
implements MembershipListener {
public String name = null;
public TestMbrListener(String name) {
this.name = name;
}
public ArrayList<Member> members = new ArrayList<>(1);
@Override
public void memberAdded(Member member) {
String msg;
int count;
synchronized (members) {
if (!members.contains(member)) {
members.add(member);
msg = "member added";
} else {
msg = "member added called, but member is already in the list";
}
count = members.size();
}
report(msg, member, count);
}
@Override
public void memberDisappeared(Member member) {
String msg;
int count;
synchronized (members) {
if (members.contains(member)) {
members.remove(member);
msg = "member disappeared";
} else {
msg = "member disappeared called, but there is no such member in the list";
}
count = members.size();
}
report(msg, member, count);
}
private void report(String event, Member member, int count) {
StringBuilder message = new StringBuilder(100);
message.append(System.currentTimeMillis());
message.append(' ');
message.append(name);
message.append(':');
message.append(event);
message.append(", has ");
message.append(count);
message.append(" members now. Member:[");
message.append("host: ");
appendByteArrayToString(message, member.getHost());
message.append(", port: ");
message.append(member.getPort());
message.append(", id: ");
appendByteArrayToString(message, member.getUniqueId());
message.append(", payload: ");
try {
message.append(new String(member.getPayload(), "ASCII"));
} catch (Exception x) {
message.append("unknown");
}
Thread t = Thread.currentThread();
message.append("]; Thread:").append(t.getName()).append(", hash:")
.append(t.hashCode());
System.out.println(message);
}
private void appendByteArrayToString(StringBuilder sb, byte[] input) {
if (input == null) {
sb.append("null");
return;
}
for (int i = 0; i < input.length; i++) {
if (i > 0) {
sb.append('.');
}
sb.append(input[i] & 0xFF);
}
}
}
}

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
public class TestGroupChannelOptionFlag {
private GroupChannel channel = null;
@Before
public void setUp() throws Exception {
channel = new GroupChannel();
}
@After
public void tearDown() throws Exception {
if (channel != null) {
try {
channel.stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
}
channel = null;
}
@Test
public void testOptionConflict() throws Exception {
boolean error = false;
channel.setOptionCheck(true);
ChannelInterceptor i = new TestInterceptor();
i.setOptionFlag(128);
channel.addInterceptor(i);
i = new TestInterceptor();
i.setOptionFlag(128);
channel.addInterceptor(i);
try {
channel.start(Channel.DEFAULT);
}catch ( ChannelException x ) {
if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
}
Assert.assertTrue(error);
}
@Test
public void testOptionNoConflict() throws Exception {
boolean error = false;
channel.setOptionCheck(true);
ChannelInterceptor i = new TestInterceptor();
i.setOptionFlag(128);
channel.addInterceptor(i);
i = new TestInterceptor();
i.setOptionFlag(64);
channel.addInterceptor(i);
i = new TestInterceptor();
i.setOptionFlag(256);
channel.addInterceptor(i);
try {
channel.start(Channel.DEFAULT);
}catch ( ChannelException x ) {
if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
}
Assert.assertFalse(error);
}
public static class TestInterceptor extends ChannelInterceptorBase {
// Just use base class
}
}

View File

@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.startup.LoggingBaseTest;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.TesterUtil;
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
public class TestGroupChannelSenderConnections extends LoggingBaseTest {
private static final int count = 2;
private ManagedChannel[] channels = new ManagedChannel[count];
private TestMsgListener[] listeners = new TestMsgListener[count];
@Before
@Override
public void setUp() throws Exception {
super.setUp();
for (int i = 0; i < channels.length; i++) {
channels[i] = new GroupChannel();
channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
channels[i].addChannelListener(listeners[i]);
}
TesterUtil.addRandomDomain(channels);
for (int i = 0; i < channels.length; i++) {
channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
}
}
public void sendMessages(long delay, long sleep) throws Exception {
resetMessageCounters();
Member local = channels[0].getLocalMember(true);
Member dest = channels[1].getLocalMember(true);
int n = 3;
log.info("Sending " + n + " messages from [" + local.getName()
+ "] to [" + dest.getName() + "] with delay of " + delay
+ " ms between them.");
for (int i = 0; i < n; i++) {
channels[0].send(new Member[] { dest }, new TestMsg(), 0);
boolean last = (i == n - 1);
if (!last && delay > 0) {
Thread.sleep(delay);
}
}
log.info("Messages sent. Waiting no more than " + (sleep / 1000)
+ " seconds for them to be received");
long startTime = System.currentTimeMillis();
int countReceived;
while ((countReceived = getReceivedMessageCount()) != n) {
long time = System.currentTimeMillis();
if ((time - startTime) > sleep) {
Assert.fail("Only " + countReceived + " out of " + n
+ " messages have been received in " + (sleep / 1000)
+ " seconds");
break;
}
Thread.sleep(100);
}
}
@Test
public void testConnectionLinger() throws Exception {
sendMessages(0,15000);
}
@Test
public void testKeepAliveCount() throws Exception {
log.info("Setting keep alive count to 0");
for (int i = 0; i < channels.length; i++) {
ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
t.getTransport().setKeepAliveCount(0);
}
sendMessages(1000,15000);
}
@Test
public void testKeepAliveTime() throws Exception {
log.info("Setting keep alive count to 1 second");
for (int i = 0; i < channels.length; i++) {
ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
t.getTransport().setKeepAliveTime(1000);
}
sendMessages(2000,15000);
}
@After
@Override
public void tearDown() throws Exception {
try {
for (int i = 0; i < channels.length; i++) {
channels[i].stop(Channel.DEFAULT);
}
} finally {
super.tearDown();
}
}
private void resetMessageCounters() {
for (TestMsgListener listener: listeners) {
listener.reset();
}
}
private int getReceivedMessageCount() {
int count = 0;
for (TestMsgListener listener: listeners) {
count += listener.getReceivedCount();
}
return count;
}
// Test message. The message size is random.
public static class TestMsg implements Serializable {
private static final long serialVersionUID = 1L;
private static Random r = new Random();
private HashMap<Integer, ArrayList<Object>> map = new HashMap<>();
public TestMsg() {
int size = Math.abs(r.nextInt() % 200);
for (int i=0; i<size; i++ ) {
int length = Math.abs(r.nextInt() %65000);
ArrayList<Object> list = new ArrayList<>(length);
map.put(Integer.valueOf(i),list);
}
}
}
public class TestMsgListener implements ChannelListener {
private final String name;
private final AtomicInteger counter = new AtomicInteger();
public TestMsgListener(String name) {
this.name = name;
}
public void reset() {
counter.set(0);
}
public int getReceivedCount() {
return counter.get();
}
@Override
public void messageReceived(Serializable msg, Member sender) {
counter.incrementAndGet();
log.info("["+name+"] Received message:"+msg+" from " + sender.getName());
}
@Override
public boolean accept(Serializable msg, Member sender) {
return true;
}
}
}

View File

@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.transport.ReceiverBase;
/**
* @version 1.0
*/
public class TestGroupChannelStartStop {
private GroupChannel channel = null;
private int udpPort = 45543;
@Before
public void setUp() throws Exception {
channel = new GroupChannel();
}
@After
public void tearDown() throws Exception {
try {
channel.stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
}
@Test
public void testDoubleFullStart() throws Exception {
int count = 0;
try {
channel.start(Channel.DEFAULT);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(Channel.DEFAULT);
count++;
} catch ( Exception x){x.printStackTrace();}
Assert.assertEquals(count,2);
channel.stop(Channel.DEFAULT);
}
@Test
public void testScrap() throws Exception {
System.out.println(channel.getChannelReceiver().getClass());
((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
}
@Test
public void testDoublePartialStart() throws Exception {
//try to double start the RX
int count = 0;
try {
channel.start(Channel.SND_RX_SEQ);
channel.start(Channel.MBR_RX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(Channel.MBR_RX_SEQ);
count++;
} catch ( Exception x){
// expected
}
Assert.assertEquals(count,1);
channel.stop(Channel.DEFAULT);
//double the membership sender
count = 0;
try {
channel.start(Channel.SND_RX_SEQ);
channel.start(Channel.MBR_TX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(Channel.MBR_TX_SEQ);
count++;
} catch ( Exception x){
// expected
}
Assert.assertEquals(count,1);
channel.stop(Channel.DEFAULT);
count = 0;
try {
channel.start(Channel.SND_RX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(Channel.SND_RX_SEQ);
count++;
} catch ( Exception x){
// expected
}
Assert.assertEquals(count,1);
channel.stop(Channel.DEFAULT);
count = 0;
try {
channel.start(Channel.SND_TX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(Channel.SND_TX_SEQ);
count++;
} catch ( Exception x){
// expected
}
Assert.assertEquals(count,1);
channel.stop(Channel.DEFAULT);
}
@Test
public void testFalseOption() throws Exception {
int flag = 0xFFF0;//should get ignored by the underlying components
int count = 0;
try {
channel.start(flag);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
channel.start(flag);
count++;
} catch ( Exception x){
// expected
}
Assert.assertEquals(count,2);
channel.stop(Channel.DEFAULT);
}
@Test
public void testUdpReceiverStart() throws Exception {
ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver();
rb.setUdpPort(udpPort);
channel.start(Channel.DEFAULT);
Thread.sleep(1000);
channel.stop(Channel.DEFAULT);
}
}

View File

@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.util.ArrayList;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.util.UUIDGenerator;
public class TestDomainFilterInterceptor {
private static int count = 10;
private ManagedChannel[] channels = new ManagedChannel[count];
private TestMbrListener[] listeners = new TestMbrListener[count];
@Before
public void setUp() throws Exception {
for (int i = 0; i < channels.length; i++) {
channels[i] = new GroupChannel();
channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
channels[i].addMembershipListener(listeners[i]);
DomainFilterInterceptor filter = new DomainFilterInterceptor();
filter.setDomain(UUIDGenerator.randomUUID(false));
channels[i].addInterceptor(filter);
}
}
public void clear() {
for (int i = 0; i < channels.length; i++) {
listeners[i].members.clear();
}
}
@Test
public void testMemberArrival() throws Exception {
//purpose of this test is to make sure that we have received all the members
//that we can expect before the start method returns
Thread[] threads = new Thread[channels.length];
for (int i=0; i<channels.length; i++ ) {
final Channel channel = channels[i];
Thread t = new Thread() {
@Override
public void run() {
try {
channel.start(Channel.DEFAULT);
}catch ( Exception x ) {
throw new RuntimeException(x);
}
}
};
threads[i] = t;
}
for (int i=0; i<threads.length; i++ ) threads[i].start();
for (int i=0; i<threads.length; i++ ) threads[i].join();
System.out.println("All channels started.");
for (int i=listeners.length-1; i>=0; i-- ) {
Assert.assertEquals("Checking member arrival length",0,listeners[i].members.size());
}
}
@After
public void tearDown() throws Exception {
for (int i = 0; i < channels.length; i++) {
try {
channels[i].stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
}
}
public static class TestMbrListener
implements MembershipListener {
public String name = null;
public TestMbrListener(String name) {
this.name = name;
}
public ArrayList<Member> members = new ArrayList<>();
@Override
public void memberAdded(Member member) {
if (!members.contains(member)) {
members.add(member);
try {
System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
} catch (Exception x) {
System.out.println(name + ":member added[unknown]");
}
}
}
@Override
public void memberDisappeared(Member member) {
if (members.contains(member)) {
members.remove(member);
try {
System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
} catch (Exception x) {
System.out.println(name + ":member disappeared[unknown]");
}
}
}
}
}

View File

@@ -0,0 +1,549 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import javax.crypto.Cipher;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
/**
* Tests the EncryptInterceptor.
*
* Many of the tests in this class use strings as input and output, even
* though the interceptor actually operates on byte arrays. This is done
* for readability for the tests and their outputs.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestEncryptInterceptor {
private static final String MESSAGE_FILE = "message.bin";
private static final String encryptionKey128 = "cafebabedeadbeefbeefcafecafebabe";
private static final String encryptionKey192 = "cafebabedeadbeefbeefcafecafebabedeadbeefbeefcafe";
private static final String encryptionKey256 = "cafebabedeadbeefcafebabedeadbeefcafebabedeadbeefcafebabedeadbeef";
EncryptInterceptor src;
EncryptInterceptor dest;
@BeforeClass
public static void setupClass() {
Security.setProperty("jdk.tls.disabledAlgorithms", "");
Security.setProperty("crypto.policy", "unlimited"); // For Java 9+
}
@AfterClass
public static void cleanup() {
File f = new File(MESSAGE_FILE);
if (f.isFile()) {
Assert.assertTrue(f.delete());
}
}
@Before
public void setup() {
src = new EncryptInterceptor();
src.setEncryptionKey(encryptionKey128);
dest = new EncryptInterceptor();
dest.setEncryptionKey(encryptionKey128);
src.setNext(new PipedInterceptor(dest));
dest.setPrevious(new ValueCaptureInterceptor());
}
@Test
public void testBasic() throws Exception {
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Basic roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testMultipleMessages() throws Exception {
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Basic roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
Assert.assertEquals("Second roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
Assert.assertEquals("Third roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
Assert.assertEquals("Fourth roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
Assert.assertEquals("Fifth roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testTinyPayload() throws Exception {
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "x";
Assert.assertEquals("Tiny payload roundtrip failed",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testLargePayload() throws Exception {
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
byte[] bytes = new byte[1024*1024];
Assert.assertArrayEquals("Huge payload roundtrip failed",
bytes,
roundTrip(bytes, src, dest));
}
@Test
@Ignore("Too big for default settings. Breaks Gump, Eclipse, ...")
public void testHugePayload() throws Exception {
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
byte[] bytes = new byte[1024*1024*1024];
Assert.assertArrayEquals("Huge payload roundtrip failed",
bytes,
roundTrip(bytes, src, dest));
}
@Test
public void testCustomProvider() throws Exception {
src.setProviderName("SunJCE"); // Explicitly set the provider name
dest.setProviderName("SunJCE");
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed to set custom provider name",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void test192BitKey() throws Exception {
Assume.assumeTrue("Skipping test192BitKey because the JVM does not support it",
192 <= Cipher.getMaxAllowedKeyLength("AES"));
src.setEncryptionKey(encryptionKey192);
dest.setEncryptionKey(encryptionKey192);
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed to set custom provider name",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void test256BitKey() throws Exception {
Assume.assumeTrue("Skipping test256BitKey because the JVM does not support it",
256 <= Cipher.getMaxAllowedKeyLength("AES"));
src.setEncryptionKey(encryptionKey256);
dest.setEncryptionKey(encryptionKey256);
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed to set custom provider name",
testInput,
roundTrip(testInput, src, dest));
}
/**
* Actually go through the interceptor's send/receive message methods.
*/
private static String roundTrip(String input, EncryptInterceptor src, EncryptInterceptor dest) throws Exception {
byte[] bytes = input.getBytes("UTF-8");
bytes = roundTrip(bytes, src, dest);
return new String(bytes, "UTF-8");
}
/**
* Actually go through the interceptor's send/receive message methods.
*/
private static byte[] roundTrip(byte[] input, EncryptInterceptor src, EncryptInterceptor dest) throws Exception {
ChannelData msg = new ChannelData(false);
msg.setMessage(new XByteBuffer(input, false));
src.sendMessage(null, msg, null);
return ((ValueCaptureInterceptor)dest.getPrevious()).getValue();
}
@Test
@Ignore("ECB mode isn't implemented because it's insecure")
public void testECB() throws Exception {
src.setEncryptionAlgorithm("AES/ECB/PKCS5Padding");
src.start(Channel.SND_TX_SEQ);
dest.setEncryptionAlgorithm("AES/ECB/PKCS5Padding");
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed in ECB mode",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testOFB() throws Exception {
src.setEncryptionAlgorithm("AES/OFB/PKCS5Padding");
src.start(Channel.SND_TX_SEQ);
dest.setEncryptionAlgorithm("AES/OFB/PKCS5Padding");
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed in OFB mode",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testCFB() throws Exception {
src.setEncryptionAlgorithm("AES/CFB/PKCS5Padding");
src.start(Channel.SND_TX_SEQ);
dest.setEncryptionAlgorithm("AES/CFB/PKCS5Padding");
dest.start(Channel.SND_TX_SEQ);
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed in CFB mode",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testGCM() throws Exception {
try {
src.setEncryptionAlgorithm("AES/GCM/PKCS5Padding");
src.start(Channel.SND_TX_SEQ);
dest.setEncryptionAlgorithm("AES/GCM/PKCS5Padding");
dest.start(Channel.SND_TX_SEQ);
} catch (ChannelException ce) {
Assume.assumeFalse("Skipping testGCM due to lack of JVM support",
ce.getCause() instanceof NoSuchAlgorithmException
&& ce.getCause().getMessage().contains("GCM"));
throw ce;
}
String testInput = "The quick brown fox jumps over the lazy dog.";
Assert.assertEquals("Failed in GCM mode",
testInput,
roundTrip(testInput, src, dest));
}
@Test
public void testIllegalECB() throws Exception {
try {
src.setEncryptionAlgorithm("AES/ECB/PKCS5Padding");
src.start(Channel.SND_TX_SEQ);
// start() should trigger IllegalArgumentException
Assert.fail("ECB mode is not being refused");
} catch (IllegalArgumentException iae) {
// Expected
}
}
@Test
public void testViaFile() throws Exception {
src.start(Channel.SND_TX_SEQ);
src.setNext(new ValueCaptureInterceptor());
String testInput = "The quick brown fox jumps over the lazy dog.";
ChannelData msg = new ChannelData(false);
msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
src.sendMessage(null, msg, null);
byte[] bytes = ((ValueCaptureInterceptor)src.getNext()).getValue();
try (FileOutputStream out = new FileOutputStream(MESSAGE_FILE)) {
out.write(bytes);
}
dest.start(Channel.SND_TX_SEQ);
bytes = new byte[8192];
int read;
try (FileInputStream in = new FileInputStream(MESSAGE_FILE)) {
read = in.read(bytes);
}
msg = new ChannelData(false);
XByteBuffer xbb = new XByteBuffer(read, false);
xbb.append(bytes, 0, read);
msg.setMessage(xbb);
dest.messageReceived(msg);
}
@Test
public void testMessageUniqueness() throws Exception {
src.start(Channel.SND_TX_SEQ);
src.setNext(new ValueCaptureInterceptor());
String testInput = "The quick brown fox jumps over the lazy dog.";
ChannelData msg = new ChannelData(false);
msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
src.sendMessage(null, msg, null);
byte[] cipherText1 = ((ValueCaptureInterceptor)src.getNext()).getValue();
msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
src.sendMessage(null, msg, null);
byte[] cipherText2 = ((ValueCaptureInterceptor)src.getNext()).getValue();
Assert.assertThat("Two identical cleartexts encrypt to the same ciphertext",
cipherText1, IsNot.not(IsEqual.equalTo(cipherText2)));
}
@Test
public void testPickup() throws Exception {
File file = new File(MESSAGE_FILE);
if(!file.exists()) {
System.err.println("File message.bin does not exist. Skipping test.");
return;
}
dest.start(Channel.SND_TX_SEQ);
byte[] bytes = new byte[8192];
int read;
try (FileInputStream in = new FileInputStream(file)) {
read = in.read(bytes);
}
ChannelData msg = new ChannelData(false);
XByteBuffer xbb = new XByteBuffer(read, false);
xbb.append(bytes, 0, read);
msg.setMessage(xbb);
dest.messageReceived(msg);
}
/*
* This test isn't guaranteed to catch any multithreaded issues, but it
* gives a good exercise.
*/
@Test
public void testMultithreaded() throws Exception {
String inputValue = "A test string to fight over.";
final byte[] bytes = inputValue.getBytes("UTF-8");
int numThreads = 100;
final int messagesPerThread = 10;
dest.setPrevious(new ValuesCaptureInterceptor());
src.start(Channel.SND_TX_SEQ);
dest.start(Channel.SND_TX_SEQ);
Runnable job = new Runnable() {
@Override
public void run() {
try {
ChannelData msg = new ChannelData(false);
XByteBuffer xbb = new XByteBuffer(1024, false);
xbb.append(bytes, 0, bytes.length);
msg.setMessage(xbb);
for(int i=0; i<messagesPerThread; ++i)
src.sendMessage(null, msg, null);
} catch (ChannelException e) {
Assert.fail("Encountered exception sending messages: " + e.getMessage());
}
}
};
Thread[] threads = new Thread[numThreads];
for(int i=0; i<numThreads; ++i) {
threads[i] = new Thread(job);
threads[i].setName("Message-Thread-" + i);
}
for(int i=0; i<numThreads; ++i)
threads[i].start();
for(int i=0; i<numThreads; ++i)
threads[i].join();
// Check all received messages to make sure they are not corrupted
Collection<byte[]> messages = ((ValuesCaptureInterceptor)dest.getPrevious()).getValues();
Assert.assertEquals("Did not receive all expected messages",
numThreads * messagesPerThread, messages.size());
for(byte[] message : messages)
Assert.assertArrayEquals("Message is corrupted", message, bytes);
}
@Test
public void testTcpFailureDetectorDetection() {
src.setPrevious(new TcpFailureDetector());
try {
src.start(Channel.SND_TX_SEQ);
Assert.fail("EncryptInterceptor should detect TcpFailureDetector and throw an error");
} catch (EncryptInterceptor.ChannelConfigException cce) {
// Expected behavior
} catch (AssertionError ae) {
// This is the junit assertion being thrown
throw ae;
} catch (Throwable t) {
Assert.fail("EncryptionInterceptor should throw ChannelConfigException, not " + t.getClass().getName());
}
}
/**
* Interceptor that delivers directly to a destination.
*/
private static class PipedInterceptor
extends ChannelInterceptorBase
{
private ChannelInterceptor dest;
public PipedInterceptor(ChannelInterceptor dest) {
if(null == dest)
throw new IllegalArgumentException("Destination must not be null");
this.dest = dest;
}
@Override
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
throws ChannelException {
dest.messageReceived(msg);
}
}
/**
* Interceptor that simply captures the latest message sent to or received by it.
*/
private static class ValueCaptureInterceptor
extends ChannelInterceptorBase
{
private byte[] value;
@Override
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
throws ChannelException {
value = msg.getMessage().getBytes();
}
@Override
public void messageReceived(ChannelMessage msg) {
value = msg.getMessage().getBytes();
}
public byte[] getValue() {
return value;
}
}
/**
* Interceptor that simply captures all messages sent to or received by it.
*/
private static class ValuesCaptureInterceptor
extends ChannelInterceptorBase
{
private ArrayList<byte[]> messages = new ArrayList<>();
@Override
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
throws ChannelException {
synchronized(messages) {
messages.add(msg.getMessage().getBytes());
}
}
@Override
public void messageReceived(ChannelMessage msg) {
synchronized(messages) {
messages.add(msg.getMessage().getBytes());
}
}
@SuppressWarnings("unchecked")
public Collection<byte[]> getValues() {
return (Collection<byte[]>)messages.clone();
}
}
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
public class TestGzipInterceptor {
@Test
public void testSmallerThanBufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE / 2);
}
@Test
public void testJustSmallerThanBufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE -1);
}
@Test
public void testExactBufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE);
}
@Test
public void testJustLargerThanBufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE + 1);
}
@Test
public void testFactor2BufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE * 2);
}
@Test
public void testFactor4BufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE * 4);
}
@Test
public void testMuchLargerThanBufferSize() throws Exception {
doCompressDecompress(GzipInterceptor.DEFAULT_BUFFER_SIZE * 10 + 1000);
}
private void doCompressDecompress(int size) throws Exception {
byte[] data = new byte[size];
Arrays.fill(data, (byte)1);
byte[] compress = GzipInterceptor.compress(data);
byte[] result = GzipInterceptor.decompress(compress);
Assert.assertTrue(Arrays.equals(data, result));
}
}

View File

@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.TesterUtil;
import org.apache.catalina.tribes.group.GroupChannel;
public class TestNonBlockingCoordinator {
private static final int CHANNEL_COUNT = 10;
private GroupChannel[] channels = null;
private NonBlockingCoordinator[] coordinators = null;
@Before
public void setUp() throws Exception {
System.out.println("Setup");
channels = new GroupChannel[CHANNEL_COUNT];
coordinators = new NonBlockingCoordinator[CHANNEL_COUNT];
Thread[] threads = new Thread[CHANNEL_COUNT];
for ( int i=0; i<CHANNEL_COUNT; i++ ) {
channels[i] = new GroupChannel();
coordinators[i] = new NonBlockingCoordinator();
channels[i].addInterceptor(coordinators[i]);
channels[i].addInterceptor(new TcpFailureDetector());
final int j = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
channels[j].start(Channel.DEFAULT);
Thread.sleep(50);
} catch (Exception x) {
x.printStackTrace();
}
}
};
}
TesterUtil.addRandomDomain(channels);
for (int i = 0; i < CHANNEL_COUNT; i++) {
threads[i].start();
}
for (int i = 0; i < CHANNEL_COUNT; i++) {
threads[i].join();
}
Thread.sleep(1000);
}
@Test
public void testCoord1() throws Exception {
int expectedCount = channels[0].getMembers().length;
for (int i = 1; i < CHANNEL_COUNT; i++) {
Assert.assertEquals("Message count expected to be equal.", expectedCount,
channels[i].getMembers().length);
}
Member member = coordinators[0].getCoordinator();
int cnt = 0;
while (member == null && (cnt++ < 100)) {
try {
Thread.sleep(100);
member = coordinators[0].getCoordinator();
} catch (Exception x) {
/* Ignore */
}
}
for (int i = 0; i < CHANNEL_COUNT; i++) {
Assert.assertEquals(member, coordinators[i].getCoordinator());
}
System.out.println("Coordinator[1] is:" + member);
}
@Test
public void testCoord2() throws Exception {
Member member = coordinators[1].getCoordinator();
System.out.println("Coordinator[2a] is:" + member);
int index = -1;
for ( int i=0; i<CHANNEL_COUNT; i++ ) {
if ( channels[i].getLocalMember(false).equals(member) ) {
System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString());
channels[i].stop(Channel.DEFAULT);
index = i;
}
}
int dead = index;
Thread.sleep(1000);
if (index == 0) {
index = 1;
} else {
index = 0;
}
System.out.println("Member count:"+channels[index].getMembers().length);
member = coordinators[index].getCoordinator();
for (int i = 1; i < CHANNEL_COUNT; i++) {
if (i != dead) {
Assert.assertEquals(member, coordinators[i].getCoordinator());
}
}
System.out.println("Coordinator[2b] is:" + member);
}
@After
public void tearDown() throws Exception {
System.out.println("tearDown");
for ( int i=0; i<CHANNEL_COUNT; i++ ) {
channels[i].stop(Channel.DEFAULT);
}
}
}

View File

@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.TesterUtil;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.InterceptorPayload;
public class TestOrderInterceptor {
GroupChannel[] channels = null;
OrderInterceptor[] orderitcs = null;
MangleOrderInterceptor[] mangleitcs = null;
TestListener[] test = null;
int channelCount = 2;
Thread[] threads = null;
@Before
public void setUp() throws Exception {
System.out.println("Setup");
channels = new GroupChannel[channelCount];
orderitcs = new OrderInterceptor[channelCount];
mangleitcs = new MangleOrderInterceptor[channelCount];
test = new TestListener[channelCount];
threads = new Thread[channelCount];
for ( int i=0; i<channelCount; i++ ) {
channels[i] = new GroupChannel();
orderitcs[i] = new OrderInterceptor();
mangleitcs[i] = new MangleOrderInterceptor();
orderitcs[i].setExpire(Long.MAX_VALUE);
channels[i].addInterceptor(orderitcs[i]);
channels[i].addInterceptor(mangleitcs[i]);
test[i] = new TestListener(i);
channels[i].addChannelListener(test[i]);
final int j = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
channels[j].start(Channel.DEFAULT);
Thread.sleep(50);
} catch (Exception x) {
x.printStackTrace();
}
}
};
}
TesterUtil.addRandomDomain(channels);
for ( int i=0; i<channelCount; i++ ) threads[i].start();
for ( int i=0; i<channelCount; i++ ) threads[i].join();
Thread.sleep(1500);
}
@Test
public void testOrder1() throws Exception {
Member[] dest = channels[0].getMembers();
final AtomicInteger value = new AtomicInteger(0);
for ( int i=0; i<100; i++ ) {
channels[0].send(dest,Integer.valueOf(value.getAndAdd(1)),0);
}
Thread.sleep(5000);
for ( int i=0; i<test.length; i++ ) {
Assert.assertFalse(test[i].fail);
}
}
@Test
public void testOrder2() throws Exception {
final Member[] dest = channels[0].getMembers();
final AtomicInteger value = new AtomicInteger(0);
final Queue<Exception> exceptionQueue = new ConcurrentLinkedQueue<>();
Runnable run = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
synchronized (channels[0]) {
channels[0].send(dest, Integer.valueOf(value.getAndAdd(1)), 0);
}
}catch ( Exception x ) {
exceptionQueue.add(x);
}
}
}
};
Thread[] threads = new Thread[5];
for (int i=0;i<threads.length;i++) {
threads[i] = new Thread(run);
}
for (int i=0;i<threads.length;i++) {
threads[i].start();
}
for (int i=0;i<threads.length;i++) {
threads[i].join();
}
if (!exceptionQueue.isEmpty()) {
Assert.fail("Exception while sending in threads: "
+ exceptionQueue.remove().toString());
}
Thread.sleep(5000);
for ( int i=0; i<test.length; i++ ) {
Assert.assertFalse(test[i].fail);
}
}
@After
public void tearDown() throws Exception {
System.out.println("tearDown");
for ( int i=0; i<channelCount; i++ ) {
channels[i].stop(Channel.DEFAULT);
}
}
public static void main(String[] args) {
org.junit.runner.JUnitCore.main(TestOrderInterceptor.class.getName());
}
public static class TestListener implements ChannelListener {
int id = -1;
public TestListener(int id) {
this.id = id;
}
int cnt = 0;
int total = 0;
volatile boolean fail = false;
@Override
public synchronized void messageReceived(Serializable msg, Member sender) {
total++;
Integer i = (Integer)msg;
if ( i.intValue() != cnt ) fail = true;
else cnt++;
System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail);
}
@Override
public boolean accept(Serializable msg, Member sender) {
return (msg instanceof Integer);
}
}
public static class MangleOrderInterceptor extends ChannelInterceptorBase {
ChannelMessage hold = null;
Member[] dest = null;
@Override
public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( hold == null ) {
//System.out.println("Skipping message:"+msg);
hold = (ChannelMessage)msg.deepclone();
dest = new Member[destination.length];
System.arraycopy(destination,0,dest,0,dest.length);
} else {
//System.out.println("Sending message:"+msg);
super.sendMessage(destination,msg,payload);
//System.out.println("Sending message:"+hold);
super.sendMessage(dest,hold,null);
hold = null;
dest = null;
}
}
}
}

View File

@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.util.ArrayList;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.TesterUtil;
import org.apache.catalina.tribes.group.GroupChannel;
public class TestTcpFailureDetector {
private TcpFailureDetector tcpFailureDetector1 = null;
private TcpFailureDetector tcpFailureDetector2 = null;
private ManagedChannel channel1 = null;
private ManagedChannel channel2 = null;
private TestMbrListener mbrlist1 = null;
private TestMbrListener mbrlist2 = null;
@Before
public void setUp() throws Exception {
channel1 = new GroupChannel();
channel2 = new GroupChannel();
channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
mbrlist1 = new TestMbrListener("Channel-1");
mbrlist2 = new TestMbrListener("Channel-2");
tcpFailureDetector1 = new TcpFailureDetector();
tcpFailureDetector2 = new TcpFailureDetector();
channel1.addInterceptor(tcpFailureDetector1);
channel2.addInterceptor(tcpFailureDetector2);
channel1.addMembershipListener(mbrlist1);
channel2.addMembershipListener(mbrlist2);
TesterUtil.addRandomDomain(new ManagedChannel[] {channel1, channel2});
}
public void clear() {
mbrlist1.members.clear();
mbrlist2.members.clear();
}
@Test
public void testTcpSendFailureMemberDrop() throws Exception {
System.out.println("testTcpSendFailureMemberDrop()");
clear();
channel1.start(Channel.DEFAULT);
channel2.start(Channel.DEFAULT);
//Thread.sleep(1000);
Assert.assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
channel2.stop(Channel.SND_RX_SEQ);
ByteMessage msg = new ByteMessage(new byte[1024]);
try {
channel1.send(channel1.getMembers(), msg, 0);
Assert.fail("Message send should have failed.");
} catch ( ChannelException x ) {
// Ignore
}
Assert.assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
channel1.stop(Channel.DEFAULT);
channel2.stop(Channel.DEFAULT);
}
@Test
public void testTcpFailureMemberAdd() throws Exception {
System.out.println("testTcpFailureMemberAdd()");
clear();
channel1.start(Channel.DEFAULT);
channel2.start(Channel.SND_RX_SEQ);
channel2.start(Channel.SND_TX_SEQ);
channel2.start(Channel.MBR_RX_SEQ);
channel2.stop(Channel.SND_RX_SEQ);
channel2.start(Channel.MBR_TX_SEQ);
//Thread.sleep(1000);
Assert.assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
channel1.stop(Channel.DEFAULT);
channel2.stop(Channel.DEFAULT);
}
@Test
public void testTcpMcastFail() throws Exception {
System.out.println("testTcpMcastFail()");
clear();
channel1.start(Channel.DEFAULT);
channel2.start(Channel.DEFAULT);
//Thread.sleep(1000);
Assert.assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
channel2.stop(Channel.MBR_TX_SEQ);
ByteMessage msg = new ByteMessage(new byte[1024]);
try {
Thread.sleep(5000);
Assert.assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
channel1.send(channel1.getMembers(), msg, 0);
} catch ( ChannelException x ) {
Assert.fail("Message send should have succeeded.");
}
channel1.stop(Channel.DEFAULT);
channel2.stop(Channel.DEFAULT);
}
@After
public void tearDown() throws Exception {
tcpFailureDetector1 = null;
tcpFailureDetector2 = null;
try {
channel1.stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
channel1 = null;
try {
channel2.stop(Channel.DEFAULT);
} catch (Exception ignore) {
// Ignore
}
channel2 = null;
}
public static class TestMbrListener implements MembershipListener {
public String name = null;
public TestMbrListener(String name) {
this.name = name;
}
public ArrayList<Member> members = new ArrayList<>();
@Override
public void memberAdded(Member member) {
if ( !members.contains(member) ) {
members.add(member);
try{
System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]");
}catch ( Exception x ) {
System.out.println(name + ":member added[unknown]");
}
}
}
@Override
public void memberDisappeared(Member member) {
if ( members.contains(member) ) {
members.remove(member);
try{
System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]");
}catch ( Exception x ) {
System.out.println(name + ":member disappeared[unknown]");
}
}
}
}
}