Using Zookeeper to Choose Task Leader


The Scenario
We deploy our application on multiple servers, sometimes old version and new version servers will coexist for several days or weeks.

We run different kinds of cron jobs in our application. For each cron task, we want to make sure it's only ran in one node, and always ran in new version server.

There are different approaches for this problem, but as we already use zookeeper, so it's naturally we use zookeeper to select task leader.

The only difference is that when there are new version server added, the old server who is the task leader will release ownership, so new version server can take the ownership. Also we want to choose a leader for each cron task randomly. So the load will be evenly distributed in all new-version nodes.

The Solution
When application starts, it register itself to zookeeper by creating ephemeral node at path: /myapps/workers.

WorkerNode
The WrokerNode contains information such as this node's version, ip address, id: (type:uuid), a list of random ints - used for sort task.
import org.apache.hadoop.util.ComparableVersion;
import lombok.Data;
import lombok.experimental.Accessors;
@Accessors(chain = true)
@Data
public class WorkerNode implements Serializable {
    private static final Random random = new Random();
    private static final long serialVersionUID = 1L;
    @NotNull
    private String id;
    @NotNull
    private String version;
    // so a leader can be randomly chosen
    @NotNull
    private List<Integer> randomInts = Lists.newArrayList(random.nextInt(), random.nextInt(), random.nextInt(),
            random.nextInt(), random.nextInt());
    @NotNull
    private String ipaddress;

    public static class WorkerNodeComparator implements Comparator<WorkerNode> {
        private final int randomPos;

        public WorkerNodeComparator(final int randomPos) {
            this.randomPos = randomPos;
        }

        @Override
        public int compare(final WorkerNode o1, final WorkerNode o2) {
            final int pos = randomPos % Math.min(o1.randomInts.size(), o2.randomInts.size());
            return ComparisonChain.start().compare(o1.version, o2.version, new VersionComparator())
                    .compare(o1.randomInts.get(pos), o2.randomInts.get(pos)).compare(o1.ipaddress, o2.ipaddress)
                    .compare(o1.id, o2.id).result();
        }
    }
    public static class VersionComparator implements Comparator<String> {
        @Override
        public int compare(final String v1, final String v2) {
            return new ComparableVersion(v1).compareTo(new ComparableVersion(v2));
        }
    }
}
TaskLeader
For each task, it will try to create ephemeral node at /myapps/works/leaders/taskname, whoever wins will be the task leader.

When the leader detect new nodes added, it will call call comparator.compare(newNode, myNode), if the new node's version is bigger or version is same, but with bigger random int, it will release it's ownership.

We use apache curator LeaderSelector recipe.

public class TaskLeader implements Closeable, LeaderSelectorListener {
    private static final Logger logger = LoggerFactory.getLogger(TaskLeader.class);
    private final CuratorFramework client;
    private final LeaderSelector leaderSelector;
    private CountDownLatch closeLatch;    
    private PathChildrenCache workersCache;

    private final String taskName;
    private final String workersPath;
    private final WorkerNode myWorkerNode;

    public TaskLeader(final CuratorFramework client, final String workersPath, final String taskName, final WorkerNode myWorkerNode, final WorkerNodeComparator comparator) {
        this.client = client;
        this.taskName = taskName;
        this.myWorkerNode = myWorkerNode;
        this.workersPath = workersPath;
        this.comparator = comparator;
        this.leaderSelector =
                new LeaderSelector(this.client, ZookeeperService.ZK_TASK_LEADER + "/" + taskName, this);
    }

    public void runForMaster() {
        logger.info("Starting master selection: " + myWorkerNode.getId());
        client.getUnhandledErrorListenable().addListener(errorsListener);
        leaderSelector.setId(myWorkerNode.getId());
        leaderSelector.autoRequeue();
        leaderSelector.start();
    }

    PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() {
        @Override
        public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
            logger.info("workers event: " + event);
            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                final WorkerNode newWorkerNode = (WorkerNode) SerializationUtils.deserialize(event.getData().getData());
                logger.info("added new WorkerNode: " + newWorkerNode);
                if (comparator.compare(newWorkerNode, myWorkerNode) > 0) {
                    logger.info("Release my leadership, as new WorkerNode with newer version comes up");
                    close();
                }
            }
        }
    };

    UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
        @Override
        public void unhandledError(final String message, final Throwable e) {
            logger.error("Unrecoverable error: " + message, e);
            close();
        }
    };

    private WorkerNodeComparator comparator;

    public boolean isLeader() {
        return leaderSelector.hasLeadership();
    }

    public Participant getLeader() {
        try {
            return leaderSelector.getLeader();
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "Can't get leader");
        }
    }


    @Override
    public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
        logger.info("ConnectionState: " + newState);
        switch (newState) {
            case CONNECTED:
                break;
            case RECONNECTED:
                break;
            case SUSPENDED:
                logger.warn("Session suspended");
                break;
            case LOST:
                close();
                break;
            case READ_ONLY:
                break;
        }
    }

    @Override
    public void takeLeadership(final CuratorFramework client) throws Exception {
        logger.info("I took the leadership for task " + taskName);
        this.workersCache = new PathChildrenCache(this.client, workersPath, true);
        workersCache.getListenable().addListener(workersCacheListener);
        workersCache.start();
        closeLatch = new CountDownLatch(1);
        closeLatch.await();
    }

    public void close() {
        try {
            logger.info("I gave up my leadership for task " + taskName);
            if (workersCache != null) {
                workersCache.getListenable().removeListener(workersCacheListener);
                workersCache.close();
            }
            if (closeLatch != null) {
                closeLatch.countDown();
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to close", e);
        }
    }
}
Configuration
When add a new task, we just needed to create a new task leader as below. Then when we run cron job, we just need check whether taskLeader is the leader: taskLeader.isLeader(), if true, run the job otherwise do nothing.
@Configuration
public class AppConfig {
    @Bean
    public CuratorFramework curatorFramework() {
        final CuratorFramework framework = CuratorFrameworkFactory.builder().connectString(zkHost)
                .connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(new RetryNTimes(3, 500)).build();
        framework.start();
        return framework;
    }
    @Bean(name = "xTaskLeader")
    public TaskLeader XTaskLeader() {
        final TaskLeader leader =
                new TaskLeader(curatorFramework(), getWorkerNodeWorkerPath(), "xTaskLeader", myNode(),
                        new WorkerNodeComparator(1));
        leader.runForMaster();
        return leader;
    }

    @Bean
    public WorkerNode myNode() {
        String ipAddress;
        try {
            ipAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (final UnknownHostException e1) {
            LOGGER.error("failed to get ipaddress", e1);
            ipAddress = "Unknown IP";
        }
        final WorkerNode WorkerNode =
                new WorkerNode().setVersion(applicationProfile.getVersion()).setIpaddress(ipAddress).setId(UUID.randomUUID().toString());

        String myPath;
        try {
            myPath = curatorFramework().create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .forPath(getWorkerNodeWorkerPath() + "/" + WorkerNode.getId(), SerializationUtils.serialize(WorkerNode));
            LOGGER.info("Create zk path: " + myPath);
        } catch (final Exception e) {
            // TODO change this
            LOGGER.error("Failed to create zk path", e);
        }
        return WorkerNode;
    }

    private String getWorkerNodeWorkerPath() {
        return ZookeeperService.getZKWorkersPath();
    }
}
ZookeeperService - get all active nodes
@Service
public class ZookeeperService {
    @Autowired
    private CuratorFramework client;

    private static final String ZK_ROOT_FOLDER = "/myapps";
    private static final String ZK_WORKERS = ZK_ROOT_FOLDER + "/workers";

    public static final String ZK_TASK_LEADER = ZK_ROOT_FOLDER + "/leaders";

    @Autowired
    @Qualifier("xTaskLeader")
    private TaskLeader xTaskLeader;

    public static String getZkWorkersPath() {
        return ZK_WORKERS;
    }

    public Map<String, WorkerNode> getAllWorkerNodes() {
        return getAllWorkerNodes(getZkWorkersPath());
    }

    private Map<String, WorkerNode> getAllWorkerNodes(final String zkWorkersPath) {
        try {
            final Map<String, WorkerNode> map = new HashMap<>();
            final List<String> children = client.getChildren().forPath(zkWorkersPath);

            if (children != null) {
                for (final String child : children) {
                    final String childPath = zkWorkersPath + "/" + child;
                    final WorkerNode WorkerNode =
                            (WorkerNode) SerializationUtils.deserialize(client.getData().forPath(childPath));
                    map.put(WorkerNode.getId(), WorkerNode);
                }
            }

            return map;
        } catch (final Exception e) {
            throw new BusinessException(ErrorCode.INTERNAL_ERROR, e, "failed to get workers from zk");
        }
    }
}


Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)