Skip to content

TP-568: Rebind service beans when PU has restarted #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public void renewLease() {
return;
}
if (serviceHasChanged(serviceDiscoveryResult.getResult())) {
if (isBound() && currentProperties != null) {
log.info("Service properties for bean={} astrixBeanId={} have changed, will unbind current service bean and rebind new bean.", getBeanKey(), id);
unbind();
}
bind(serviceDiscoveryResult.getResult());
} else {
log.debug("Service properties have not changed. No need to bind bean=" + getBeanKey());
Expand Down Expand Up @@ -221,6 +225,11 @@ void destroy() {
}
}

private void unbind() {
this.currentState.releaseInstance();
this.currentState = new Unbound(ServiceUnavailableException.class, "Service is unavailable");
}

private void notifyBound() {
boundStateLock.lock();
try {
Expand Down
13 changes: 13 additions & 0 deletions astrix-gs/src/main/java/com/avanza/astrix/gs/GsBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.avanza.astrix.gs;

import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -26,6 +27,9 @@
import com.avanza.astrix.beans.core.AstrixSettings;
import com.avanza.astrix.beans.service.ServiceProperties;
import com.avanza.astrix.config.DynamicConfig;
import com.gigaspaces.internal.client.spaceproxy.IDirectSpaceProxy;
import com.gigaspaces.internal.server.space.SpaceImpl;
import com.j_spaces.core.JSpaceContainerImpl;

/**
*
Expand All @@ -37,6 +41,7 @@ public class GsBinder implements AstrixConfigAware {
public static final String SPACE_NAME_PROPERTY = "spaceName";
public static final String SPACE_URL_PROPERTY = "spaceUrl";
private static final String SPACE_REQUIRES_AUTHENTICATION = "isSecured";
public static final String START_TIME = "startTime";

private static final Pattern SPACE_URL_PATTERN = Pattern.compile("jini://.*?/.*?/(.*)?[?](.*)");
private DynamicConfig config;
Expand Down Expand Up @@ -93,9 +98,17 @@ public ServiceProperties createProperties(GigaSpace space) {
result.setProperty(SPACE_NAME_PROPERTY, space.getSpace().getName());
result.setProperty(SPACE_URL_PROPERTY, new SpaceUrlBuilder(space).buildSpaceUrl());
result.setProperty(SPACE_REQUIRES_AUTHENTICATION, Boolean.toString(space.getSpace().isSecured()));
getInstanceStartTime(space).ifPresent(t -> result.setProperty(START_TIME, t.toString()));
return result;
}

private Optional<Long> getInstanceStartTime(GigaSpace space) {
return Optional.ofNullable(space.getSpace().getDirectProxy())
.map(IDirectSpaceProxy::getSpaceImplIfEmbedded)
.map(SpaceImpl::getContainer)
.map(JSpaceContainerImpl::getStartTime);
}

public ServiceProperties createServiceProperties(String spaceUrl) {
Matcher spaceUrlMatcher = SPACE_URL_PATTERN.matcher(spaceUrl);
if (!spaceUrlMatcher.find()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2014 Avanza Bank AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.avanza.astrix.integration.tests;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.openspaces.core.cluster.ClusterInfo;
import org.openspaces.core.properties.BeanLevelProperties;
import org.openspaces.pu.container.ProcessingUnitContainer;
import org.openspaces.pu.container.integrated.IntegratedProcessingUnitContainerProvider;

import com.avanza.astrix.beans.core.AstrixSettings;
import com.avanza.astrix.config.DynamicConfig;
import com.avanza.astrix.config.GlobalConfigSourceRegistry;
import com.avanza.astrix.config.MapConfigSource;
import com.avanza.astrix.context.AstrixConfigurer;
import com.avanza.astrix.context.AstrixContext;
import com.avanza.astrix.integration.tests.domain.api.LunchRestaurant;
import com.avanza.astrix.integration.tests.domain.api.LunchService;
import com.avanza.gs.test.JVMGlobalLus;
import com.avanza.gs.test.PuConfigurers;
import com.avanza.gs.test.RunningPu;

public class RestartSpaceTest {
private final String lookupGroupName = JVMGlobalLus.getLookupGroupName();
private final MapConfigSource configSource = new MapConfigSource() {{
set(AstrixSettings.SERVICE_REGISTRY_URI, "gs-remoting:jini://*/*/service-registry-space?groups=" + lookupGroupName);
set(AstrixSettings.BEAN_BIND_ATTEMPT_INTERVAL, 250);
set(AstrixSettings.SERVICE_LEASE_RENEW_INTERVAL, 100);
set(AstrixSettings.SERVICE_REGISTRY_EXPORT_INTERVAL, 150);
}};
private final String configSourceId = GlobalConfigSourceRegistry.register(configSource);
private final DynamicConfig dynamicConfig = new DynamicConfig(configSource);
@Rule
public final RunningPu serviceRegistryPu = PuConfigurers.partitionedPu("classpath:/META-INF/spring/service-registry-pu.xml")
.lookupGroup(lookupGroupName)
.startAsync(false)
.configure();
private AstrixContext astrix;
private ProcessingUnitContainer pu;

@After
public void afterEachTest() throws Exception {
if (astrix != null) {
astrix.close();
}
if (pu != null) {
pu.close();
}
}

private ProcessingUnitContainer startPuContainer() throws IOException {
Properties contextProperties = new Properties();
contextProperties.put("spaceName", this.getClass().getName());
contextProperties.put("configSourceId", configSourceId);
contextProperties.put("gs.space.url.arg.groups", this.lookupGroupName);
BeanLevelProperties beanLevelProperties = new BeanLevelProperties();
beanLevelProperties.setContextProperties(contextProperties);

// Cannot use "PuConfigurers.partitionedPu" for this space since it
// gives a unique spacename for each start, and we want to restart with
// the same spacename in these tests.
IntegratedProcessingUnitContainerProvider provider = new IntegratedProcessingUnitContainerProvider();
provider.setClusterInfo(new ClusterInfo("partitioned", 1, 0, 1, 0));
provider.addConfigLocation("classpath:/META-INF/spring/lunch-pu.xml");
provider.setBeanLevelProperties(beanLevelProperties);
return provider.createContainer();
}

@Test
public void shouldCallAstrixServiceWithoutExceptionsWhenPuHasRestarted() throws Exception {
// Arrange
pu = startPuContainer();
astrix = new AstrixConfigurer().setConfig(dynamicConfig).configure();
LunchService lunchService = astrix.waitForBean(LunchService.class, 5000);
List<LunchRestaurant> response1 = lunchService.getLunchRestaurants("request");

// Act
pu.close();
pu = startPuContainer();
// Wait for service renewal to execute
Thread.sleep(1_000);

// Assert
List<LunchRestaurant> response2 = lunchService.getLunchRestaurants("request");
assertEquals(response1, response2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

import static org.junit.Assert.assertEquals;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import com.avanza.astrix.beans.core.AstrixBeanKey;
import com.avanza.astrix.beans.core.AstrixSettings;
import com.avanza.astrix.beans.registry.AstrixServiceRegistry;
Expand All @@ -32,6 +36,7 @@
import com.avanza.astrix.config.MapConfigSource;
import com.avanza.astrix.context.AstrixConfigurer;
import com.avanza.astrix.context.AstrixContext;
import com.avanza.astrix.gs.GsBinder;
import com.avanza.astrix.provider.component.AstrixServiceComponentNames;
import com.avanza.astrix.test.util.AutoCloseableRule;
import com.avanza.gs.test.PuConfigurers;
Expand Down Expand Up @@ -108,5 +113,34 @@ interface SomeService {

interface AnotherService {
}


@Test
public void serviceRegistrationRetainsHighestValueOfPuStartTime() {
// Arrange
AstrixServiceRegistry serviceRegistry = clientContext.getBean(AstrixServiceRegistry.class);
ServiceRegistryClient serviceRegistryClient = clientContext.getBean(ServiceRegistryClient.class);
ServiceRegistryExporterClient exporterClient = new ServiceRegistryExporterClient(serviceRegistry, "default", "app-instance-1");
Instant t0 = Instant.parse("2021-01-01T12:00:00.000Z");
Instant t1 = t0.plus(5, ChronoUnit.MINUTES);
String t0AsString = String.valueOf(t0.toEpochMilli());
String t1AsString = String.valueOf(t1.toEpochMilli());
ServiceProperties props0 = new ServiceProperties(Map.of(GsBinder.START_TIME, t0AsString));
ServiceProperties props1 = new ServiceProperties(Map.of(GsBinder.START_TIME, t1AsString));
long lease = 5000;

// Act
exporterClient.register(SomeService.class, props0, lease);
exporterClient.register(SomeService.class, props1, lease);
exporterClient.register(AnotherService.class, props1, lease);
exporterClient.register(AnotherService.class, props0, lease);

// Assert
ServiceProperties foundProps1 = serviceRegistryClient.lookup(AstrixBeanKey.create(SomeService.class));
ServiceProperties foundProps2 = serviceRegistryClient.lookup(AstrixBeanKey.create(AnotherService.class));
// We want to make sure that both registrations have stored the time of
// "t1" in its registration (the max value of all "startTime").
// Specifically, they should *not* have "t0".
assertEquals(t1AsString, foundProps1.getProperty(GsBinder.START_TIME));
assertEquals(t1AsString, foundProps2.getProperty(GsBinder.START_TIME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
*/
package com.avanza.astrix.service.registry.pu;

import static com.avanza.astrix.gs.GsBinder.START_TIME;
import static java.lang.Math.max;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.openspaces.core.GigaSpace;
import org.springframework.beans.factory.annotation.Autowired;

import com.avanza.astrix.beans.registry.AstrixServiceRegistryEntry;
import com.avanza.astrix.beans.registry.ServiceKey;
import com.avanza.astrix.beans.registry.ServiceProviderKey;
Expand All @@ -49,6 +52,7 @@ public void insertOrUpdate(AstrixServiceRegistryEntry entry, long lease) {
ServiceProviderKey serviceProviderKey = ServiceProviderKey.create(serviceKey, applicationInstanceId);
spaceEntry.setServiceProviderKey(serviceProviderKey);
spaceEntry.setProperties(entry.getServiceProperties());
setLatestStartTime(spaceEntry);
Map<String, String> metadata = new HashMap<>();
Date now = new Date();
metadata.put("lastLeaseRenewalTime", now.toString());
Expand All @@ -57,6 +61,30 @@ public void insertOrUpdate(AstrixServiceRegistryEntry entry, long lease) {
gigaSpace.write(spaceEntry, lease);
}

private void setLatestStartTime(SpaceServiceRegistryEntry spaceEntry) {
final SpaceServiceRegistryEntry existingEntry = gigaSpace.readById(
SpaceServiceRegistryEntry.class,
spaceEntry.getServiceProviderKey(),
spaceEntry.getApiType()
);
if (existingEntry == null) {
return;
}
final long startTimeExistingEntry = getStartTime(existingEntry);
final long startTimeNewEntry = getStartTime(spaceEntry);
spaceEntry.getProperties().put(
START_TIME,
Long.toString(max(startTimeExistingEntry, startTimeNewEntry))
);
}

private long getStartTime(SpaceServiceRegistryEntry entry) {
return Optional.ofNullable(entry.getProperties())
.map(p -> p.get(START_TIME))
.map(Long::parseLong)
.orElse(0L);
}

@Override
public List<AstrixServiceRegistryEntry> findAll() {
SpaceServiceRegistryEntry[] entries = gigaSpace.readMultiple(SpaceServiceRegistryEntry.template());
Expand Down