Skip to content

Joining channels using a map as key

Open in Gitpod

Problem

You want to join two channels using identical maps as the key to join by.

examples/join-on-map-fails-resume/problem.nf
left = LEFT(ch_param)  // (1)
right = RIGHT(ch_param)  // (2)

ch_joined = left.join(right)  // (3)
  1. A channel that contains a pair of a map with sample meta information and a number.
    tuple val(meta), val(count)
    
  2. A channel that contains a pair of a map with sample meta information and a list of file paths.
    tuple val(meta), path(reads)
    
  3. The desired joined channel should contain the map, the number, and the file paths.
    tuple val(meta), val(count), path(read_pairs)
    

This will work perfectly fine when you execute your workflow from the beginning. However, when you resume your workflow, you will likely see that from the point of such a join statement, many samples are dropped from further processing since the maps no longer evaluate as being equal and thus the tuples are discarded as being incomplete. You can avoid elements being silently discarded by using the failOnMismatch option.

examples/join-on-map-fails-resume/problem.nf
left = LEFT(ch_param)
right = RIGHT(ch_param)

ch_joined = left.join(right, failOnMismatch: true)

Solution

Since maps, as mutable objects, may fail to evaluate as being equal after resuming1, we can pull out an immutable value from the maps and join on them. Your map likely contains an id key which is a unique string or integer that is equal in both channels to be joined. This requires a couple of channel transformations such that we end up with the resulting channel containing the desired map as first element, followed by the remaining elements from both joined channels.

examples/join-on-map-fails-resume/solution.nf
def left = LEFT(ch_param).map { [it[0].id, it[0], it[1]] }  // (1)
def right = RIGHT(ch_param).map { [it[0].id, it[0], it[1]] }  // (2)

def ch_joined = left.join(right).map { [it[1], it[2], it[4]] }  // (3)
  1. We prepend the id key which contains an immutable value.
    tuple val(id), val(meta), val(count)
    
  2. We prepend the id key which contains an immutable value.
    tuple val(id), val(meta), path(reads)
    
  3. After the join that occurred on the id value, we remove that element and also drop one of the otherwise identical maps.
    tuple val(meta), val(count), path(read_pairs)
    

In order to generally, safely join two channels on a map key, I therefore propose you use the following function which was developed together with 🪄 Mahesh Binzer-Panchal

lib/CustomChannelOperators.groovy
/**
 * Provide a collection of custom channel operators that go beyond the nextflow default.
 */
class CustomChannelOperators {

    /**
     * Join two channels by one or more keys from a map contained in each channel.
     *
     * The channel elements are assumed to be tuples whose size is at least two.
     * Typically, the maps to join by are in the first position of the tuples.
     * Please read https://www.nextflow.io/docs/latest/operator.html#join carefully.
     *
     * @param args A map of keyword arguments that is passed on to the nextflow join call.
     * @param left The left-hand side channel in the join.
     * @param right The right-hand side channel in the join.
     * @param key A string or list of strings providing the map keys to compare.
     * @param leftBy The position of the map in the left channel.
     * @param rightBy The position of the map in the right channel.
     * @return The joined channels with the map in the original position of the left channel,
     *      followed by all elements of the right channel except for the map.
     */
    public static Object joinOnKeys(
            Map joinArgs = [:],
            left,
            right,
            key,
            int leftBy = 0,
            int rightBy = 0
    ) {
        List keys = key instanceof List ? key : [ key ]

        // Extract desired keys from the left map, located at `leftBy`, and prepend them.
        def newLeft = left.map { it[leftBy].subMap(keys).values() + it }

        // Extract desired keys from the right map, located at `rightBy`, and prepend them.
        // Also drop the map itself from the right.
        def newRight = right.map {
            it[rightBy].subMap(keys).values() +
            it[0..<rightBy] +
            it[rightBy<..<it.size()]
        }

        // Set the positions to join on explicitly.
        joinArgs.by = 0..<keys.size()

        // Apply the join channel operator to the channels and finally drop the keys used for joining tuples.
        return newLeft.join(joinArgs, newRight).map { it[keys.size()..<it.size()] }
    }

}

  1. My current hypothesis is that when you start a new pipeline, the different channels point to the same map object, whereas when you resume, different instances of the map with the same content are created. Then, I guess the comparison carried out by nextflow to join channels, is based on the object identity rather than comparing all key, value pairs.