Running Spring Scheduler In Cluster Mode

Facing an issue where the email scheduler job keep sending duplicate activation email to user after they registered to the system.
After some troubleshooting time, found the culprit was actually spring schedule job running separately in separate node under cluster environment.

I then decide to “borrow” the idea from Liquibase on how it ensure only one instance running the changeset at one time. I first create a table call schedule_lock with column below:

CREATE TABLE scheduler_lock
(
  id integer NOT NULL,
  job_type character varying(50),
  is_lock boolean,
  lock_time timestamp without time zone,
  lock_by character varying(255),
  CONSTRAINT pk_scheduler_lock PRIMARY KEY (id)
)

By having the job_type column, we can store different schedule job status in single table. lock_time and lock_by simply store the lock  time and host name for easy debugging later. So the idea behind is pretty simple, any job will need to check against the lock status and their job type before running the job.

For eg: Node A run the job, it own the lock by updating is_lock to TRUE, lock_time and lock_by to current time and host name. Now Node B kicks in with the same job, it then check against the column, found is_lock = TRUE, it know some node already performing the job, so it can have some rest time by discard it’s own job.
After Node A complete it’s job, it will then release the lock by updating is_lock to FALSE, lock_time and lock_by to null.

And here is the code snippet for getting and releasing the lock.

public boolean getLock(SchedulerJobType jobType, String author) {
        log.debug("[{}] getting the lock", author);
        Session session = sessionFactory.getCurrentSession();
        Criteria criteria = session.createCriteria(SchedulerLock.class);
        criteria.add(eq("jobType", jobType.getCode()));
        SchedulerLock scheduleLock = (SchedulerLock) criteria.uniqueResult();
        
        if (scheduleLock.getIsLock().equals(Boolean.FALSE)) {
            scheduleLock.setLock(Boolean.TRUE);
            scheduleLock.setLockBy(author);
            scheduleLock.setLockTime(new Date());
            session.update(scheduleLock);
            log.debug("[{}] got the lock", author);
            return true;
        }
        return false;
    }
public void releaseLock(SchedulerJobType jobType, String author) {
        log.debug("[{}] releasing the lock", author);
        Session session = sessionFactory.getCurrentSession();
        Criteria criteria = session.createCriteria(SchedulerLock.class);
        criteria.add(eq("jobType", jobType.getCode()));
        criteria.add(eq("lockBy", author));

        try {
            SchedulerLock scheduleLock = (SchedulerLock) criteria.uniqueResult();
            if (scheduleLock.getIsLock().equals(Boolean.TRUE)) {
                scheduleLock.setLock(Boolean.FALSE);
                scheduleLock.setLockBy(null);
                scheduleLock.setLockTime(null);
                session.update(scheduleLock);
                log.debug("[{}] released the lock", author);
            }

        } catch (HibernateException e) {
            log.debug("Not able to release lock by current thread {} due to {}", author, e.getCause());
        }
    }

OK, seems everything is now in place and my spring scheduler is “cluster safe”, Yay !

BUT … when the job run again with this new implementation, some weird behavior again happen.
Node A and Node B got the lock one after another, Node A release the lock successfully, but Node B throw an exception when releasing the lock. Logs below explain the situation:

#Node A
2016-02-02 03:12:00,000 DEBUG EmailServiceImpl: Email scheduler job started...
2016-02-02 03:12:00,013 DEBUG SchedulerLockDAOImpl: [Node-A] getting the lock
2016-02-02 03:12:00,022 DEBUG SchedulerLockDAOImpl: [Node-A] got the lock
.....
2016-02-02 03:12:06,199 DEBUG SchedulerLockDAOImpl: [Node-A] releasing the lock
2016-02-02 03:12:06,204 DEBUG SchedulerLockDAOImpl: [Node-A] released the lock
2016-02-02 03:12:06,210 DEBUG EmailServiceImpl: Email scheduler job end successfully...
#Node B
2016-02-02 03:12:00,000 DEBUG EmailServiceImpl: Email scheduler job started...
2016-02-02 03:12:00,013 DEBUG SchedulerLockDAOImpl: [Node-B] getting the lock
2016-02-02 03:12:00,019 DEBUG SchedulerLockDAOImpl: [Node-B] got the lock
.....
2016-02-02 03:12:05,082 DEBUG SchedulerLockDAOImpl: [Node-B] releasing the lock
2016-02-02 03:12:05,102 ERROR TaskUtils$LoggingErrorHandler: Unexpected error occurred in scheduled task.

Lets zoom into the code and see what is happening.

if (scheduleLock.getIsLock().equals(Boolean.FALSE)) {
    scheduleLock.setLock(Boolean.TRUE);  //<-- Node A
    scheduleLock.setLockBy(author);
    scheduleLock.setLockTime(new Date());
    session.update(scheduleLock);        //<-- Node B
    log.debug("[{}] got the lock", author);
    return true;
}

In this situation, the only possibility I can think of is Node B run the job first and got the lock by updating the record into database, but before Node B commit the changes into database, Node A kicks in safely with lock status still FALSE, so it then overwrite the lock status updated by Node B just almost immediately.

After some thought, I come out with an idea to make them run in different time by setting random Thread sleep time for each of the job and guess what, problem solved 🙂

@Scheduled(cron = "0 0 */2 * * ?")
public void sendActivationEmailTask() {

    try {
        int sleepPeriod = ThreadLocalRandom.current().nextInt(3, 60);
        Thread.sleep(sleepPeriod * 1000);
        String author = InetAddress.getLocalHost().getHostName();
        if (schedulerLockDAO.getLock(SchedulerJobType.EMAIL, author)) {
            // GET THE RECORD AND SEND EMAIL
            schedulerLockDAO.releaseLock(SchedulerJobType.EMAIL, author);
        }

    } catch (Exception e) {
        // LOG EXCEPTION
    }
}

p/s: This might not be the best way, as the scheduler job will not run in exact 2 minutes we set, in this case will be 2 minutes + Thread sleep time from 1 – 60 sec. Leave me a comment if you have better idea.

 

Filter Exception Does Not Translate By ExceptionTranslationFilter

If you are having custom servlet filter added into Spring security filter chain, and you are expecting custom filter exception throws should translate by ExceptionTranslationFilter but it doesn’t. Then you probably need to register your custom filter after ExceptionTranslationFilter as below.

protected void configure(HttpSecurity http) throws Exception {
        http.addFilterAfter(new YOUR_CUSTOM_SERVLET_FILTER(), ExceptionTranslationFilter.class);
    }

 

 

SpringData for MongoDB

Since there are numbers of tutorial out there to teach you how you can get simple MongoDB application up and running with spring data, so I’m not going to talk about it here, instead try to summarize Spring Data for MongoDB (v1.7) after reading it 🙂

Generally there are 2 way to perform tasks against MongoDB using SDMB (Spring Data MonggoDB) describe below:

1. MongoTemplate

Code snipped below show how easy we can insert, find and drop a collection in MongoDB using MongoTemplate.

@Document
public class Shop {

    @Id
    private BigInteger id;
    private String shopName;
    private String address;

    public Shop() {}

    public Shop(String shopName, String address) {
        this.shopName = shopName;
        this.address = address;
    }

    public BigInteger getId() {
        return id;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopName() {
        return shopName;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public String getAddress() {
        return address;
    }
}
@Configuration
@EnableMongoRepositories({"com.dicksonkho.repositories"})
public class MongoConfig extends AbstractMongoConfiguration {

    @Override
    protected String getDatabaseName() {
        return "shopdb";
    }

    @Bean
    @Override
    public Mongo mongo() throws Exception {
        return new MongoClient("localhost");
    }

    @Override
    protected String getMappingBasePackage() {
        return "com.dicksonkho.domain";
    }
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {MongoConfig.class})
public class TemplateTest {

    @Autowired
    MongoOperations mongoOperation;

    @Test
    public void testCRUDRecordWithTemplate() {
        Shop shop = new Shop("muimui", "Taming Jaya");
        mongoOperation.save(shop);
        Assert.assertNotNull(shop.getId());

        Shop insertedShop = mongoOperation.findOne(new Query(where("shopName").is("muimui")), Shop.class);
        Assert.assertEquals(shop.getId(), insertedShop.getId());

        insertedShop.setShopName("newmui");
        mongoOperation.save(insertedShop);
        Shop updatedShop = mongoOperation.findOne(new Query(where("shopName").is("newmui")), Shop.class);
        Assert.assertEquals(shop.getId(), updatedShop.getId());

        mongoOperation.dropCollection("shop");
    }
}

NOTE : MongoTemplate implements MongoOperations

  • When we insert new Post, MappingMongoConverter (default for MongoTemplate) will do the class / object type mapping, and save into MongoDB.
  • There are numbers of methods provided in MongoTemplate to deal with MongoDB, such as upsert, count, findAndModify etc.
  • It supported geospatial, GeoJson, and full text search queries.
  • It support map reduce operation to perform batch processing and data aggregation.
  • It provides methods for managing indexed and collection.
  • We can execute MongoDB driver’s DB.command using executeCommand and any exception will translate into Spring’s DataAccessException hierarchy.

2. Repository

There is a core marker interface for Spring Data repository abstraction called Repository, which then inherited by few sub-interface sort in hierarchical order as below:

Repository < CrudRepository < PagingAndSortingRepository < MongoRepository

Defining a query to manipulate data just a matter of declaring query method on repository interface.
Refer sample interface below, we will have a method for querying shop by shop name, querying shop name which start with some regex pattern, etc.

public interface ShopRepository extends Repository<Shop, BigInteger> {

    Shop save(Shop shop);

    Shop findByShopName(String shopName);

    Shop findOne(BigInteger id);

    Shop findByShopNameStartingWith(String shopName);

    List deleteByShopName(String shopName);
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {MongoConfig.class})
public class RepositoryTest {

    @Autowired
    private ShopRepository repository;

    @Test
    public void testCRUDRecordWithRepository() {
        Shop shop = new Shop("muimui", "Taming Jaya");
        repository.save(shop);
        Assert.assertNotNull(shop.getId());

        Shop shopByShopName = repository.findByShopName("muimui");
        Assert.assertEquals(shop.getId(), shopByShopName.getId());

        Shop shopByNameStartWith = repository.findByShopNameStartingWith("mui");
        Assert.assertEquals(shop.getId(), shopByNameStartWith.getId());
        List & lt;Shop & gt;
        deletedShopList = repository.deleteByShopName("muimui");
        Assert.assertEquals(1, deletedShopList.size()); // 1 record deleted

        Shop recheckTheShop = repository.findByShopName("muimui");
        Assert.assertNull(recheckTheShop);
    }
}

NOTE: Sample above extends the most basic Repository interface, but we can extends other interface like CrudRepository, PagingAndSortingRepository based on our need.

  • There are 3 types of query lookup strategies being use by Repository infrastructure:
    CREATE, USE_DECLARE_QUERY and CREATE_IF_NOT_FOUND (default)
  • There are numbers of method convention are available such as: findByXXXLessThan, findByYYYBefore, findByLocationNear, etc.
  • We can also create MongoDB JSON based query method to override the query method as sample below:
    @Query("{ 'shopName' : ?0 }")
    List<Person> findByShopName(String shopName);
    
  • Result of query can be process incrementally using Java8 Stream as return type.
  • Support integration with QueryDSL to provide type-safe query.
  • Support full text search.

Spring Data extensions (Web Support)

@Configuration
@EnableWebMvc
@EnableSpringDataWebSupport
@EnableMongoRepositories({"com.dicksonkho.repositories"})
@ComponentScan(basePackages = {"com.dicksonkho.controller"})
public class MongoWebConfig extends AbstractMongoConfiguration {

    @Override
    protected String getDatabaseName() {
        return "shopdb";
    }

    @Bean
    @Override
    public Mongo mongo() throws Exception {
        return new MongoClient("localhost");
    }
}

As you may notice, these configuration was exactly same as MongoConfig except 2 additional annotation (EnableWebMvc & EnableSpringDataWebSupport) which to enable spring data web support.

@Controller
@RequestMapping("/shop")
public class ShopController {

    @ResponseBody
    @RequestMapping(value = "/{id}", method = RequestMethod.GET)
    public Shop showUserForm(@PathVariable("id") Shop shop) {
        return shop;
    }
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {MongoWebConfig.class})
@WebAppConfiguration
public class SpringDataWebSupportTests {

    private MockMvc mockMvc;
    ObjectMapper mapper = new ObjectMapper();

    @Autowired
    protected WebApplicationContext webApplicationContext;

    @Autowired
    private ShopRepository repository;

    @Test
    public void getShopWithWebSupport() throws Exception {
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();

        Shop shop = new Shop("muimui", "Taming Jaya");
        repository.save(shop);

        MvcResult result = mockMvc.perform(get("/shop/" + shop.getId())).andReturn();
        Shop shopResponse = mapper.readValue(result.getResponse().getContentAsString(), Shop.class);
        Assert.assertEquals("Taming Jaya", shopResponse.getAddress());

        repository.deleteByShopName("muimui");
    }
}

See, we just need to pass in id as request parameter, and we get back the object without any additional coding. What it does was internally convert the path variable and access the instance through findOne() on repository instance.

Besides that, spring data web support able to resolve request parameter (page, size and sort) to Pageable instance which then allow us to use it to query list of object with pagination and sortable functionality (refer snippet below).

@RequestMappingpublic String showUsers(Pageable pageable) {
    ...
}

Lifecycle Events

There are few lifecycle event allow us to intercept an object before/after certain action take place.

public class BeforeConvertListener extends AbstractMongoEventListener {

    @Override
    public void onBeforeConvert(Shop s) {
        //change the state of object, or whatever
    }
}
public class BeforeSaveListener extends AbstractMongoEventListener {

    @Override
    public void onBeforeSave(Shop s, DBObject dbo) {
        // change the state of object, or whatever
    }
}

Reference : Spring Data MongoDB – Reference Documentation

Source code available @ github