Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 

Join us at FabCon Vienna from September 15-18, 2025, for the ultimate Fabric, Power BI, SQL, and AI community-led learning event. Save €200 with code FABCOMM. Get registered

Reply
spartan27244
Helper III
Helper III

Pass File list to notebook from copy data in a pipeline

I have a copy data activity that copies files that come in via sftp each day to my onelake. I need to be able move those files to another folder once the copy is complete. Since there is no activity for that I have a PySpark notebook that will take paramters (list of files moved, and folder to move to) and perform the operation. My problem is that I cannot figure out how to get the list of files from the copy data activity to pass to the notebook, and if that output exists then what does it look like, how do I get it, parse it in my notebook? co-pilot suggested pass dynamic activity of "@activity('Copy data1').output.fileList" however it cannot provide details and I cannot find any official documentation. I have found a post that suggested that it's not possible. 

11 REPLIES 11
OldDogNewTricks
Advocate II
Advocate II

I understand.  Go ahead and provide Kudos and mark as the answer if you feel that is the right thing to do.

Hi @spartan27244 

Thanks for reaching out to the Microsoft fabric community forum. 

 

I would also take a moment to thank @OldDogNewTricks , for actively participating in the community forum and for the solutions you’ve been sharing in the community forum. Your contributions make a real difference. 
If this post helps then please mark it as a solution, so that other members find it more quickly.

Best Regards, 
Menaka.
Community Support Team  

 

OldDogNewTricks
Advocate II
Advocate II

I just noticed my replies appear out of order because of the default sort on replies.  You will probably want to sort by "Oldest to Newest" to have the replies in the correct order.  Sorry, there was a character limit on replies, hence the multiple replies.

Feel free to kudos all of them if they are helpful 😉

Thanks for all of the code, I already am using most of it, just have bosses that don't know fabric but like to see pipelines with nice blocks of predefined activities. Sometimes they just don't listen and you have to prove to them "it can't be done that way". 

OldDogNewTricks
Advocate II
Advocate II

This function will archive folders on the SFTP folder itself:
def sftp_archive_files(
        creds: dict
        ,src_path: str
        ,files: list
        ,archive_path: str
        ) -> Tuple [int, int, str]:
    """
    Connect to SFTP server and remove previously processed files using list of file names.

    Parameters:
        creds                      = Dictionary object that contains the endpoint, username, and password for this sftp site
        src_path                   = Path on sftp to search
        files                      = List of files (from previous handling function)
        archive_path               = Path to move file to on SFTP server
   
    Return:
        files_touched              = Number of files touched during the process
        files_moved                = Number of files moved on the SFTP server
        message                    = Message summarizing the action(s) taken
   
    Example usage:
        file_touched, files_deleted, msg = sftp_archive_files(sftp_creds, '/ven-directdivision', ['File1.txt','File2.txt'], '/ven-directdivision/archive')
    """

    # Connect to the SFTP server using paramiko
    transport = paramiko.Transport((creds['endpoint'],creds['port']))
    transport.connect(username=creds['username'], password=creds['password'])

    files_touched = 0
    files_moved = 0

    with paramiko.SFTPClient.from_transport(transport) as sftp:
        sftp.chdir(src_path)
       
        for file in files:
            files_touched += 1

            file_name = file
            file_path = f"{src_path}/{file_name}"

            archive_full_path = f"{archive_path}/{file_name}"
            print(f'Moving: {file_path} to {archive_full_path}')
           
            sftp.rename(file_path, archive_full_path)

            files_moved += 1

    transport.close()

    msg = f'Files touched: {files_touched}, Files moved: {files_moved}'
   
    return(files_touched, files_moved, msg)

    with paramiko.SFTPClient.from_transport(transport) as sftp:
        sftp.chdir(src_path)
        files = sftp.listdir_attr()

        for file_attr in files:
            files_touched += 1

            file_name = file_attr.filename
           
            # Check if the file matches the mask and date conditions
            if filename_mask in file_name:
                file_path = f"{src_path}/{file_name}"
                print(f'Downloading file: {file_path}...')

                # download file into memory, put file into dataframe, append dataframe to list of dataframes
                df = sftp_download_file(sftp, file_path, src_file_delim, src_file_encoding, src_file_head_row, transport)
                df['FileName'] = file_name
                lst_df.append(df)
                lst_files.append(file_name)
               
                files_downloaded += 1
               
       
        # Combine all DataFrames into a single result DataFrame
        if lst_df:
            df_result = pd.concat(lst_df, ignore_index=True)
            print("All files successfully combined into a single DataFrame.")
        else:
            df_result = pd.DataFrame()
            print("No files matched the conditions.")

    transport.close()
    return(files_touched, files_downloaded, df_result, lst_files)

 

Hopefully this helps!  We are using these functions to ingest hundreds of files daily.

OldDogNewTricks
Advocate II
Advocate II

This function downloads all files in a folder location that meet the specified file mask:
def sftp_get_files_all(
        creds: dict
        ,src_path: str
        ,filename_mask: str
        ,src_file_delim: Optional[str] = None
        ,src_file_encoding: Optional[str] = None
        ,src_file_head_row: Optional[int] = 1
        ) -> Tuple [int, int, pd.DataFrame, list]:
    """
    Connect to SFTP server, download all files in folder that match the file mask, combine into dataframe using paramiko and upload to lakehouse.

    Parameter:
        creds                      = Dictionary object that contains the endpoint, username, and password for this sftp site
        src_path                   = Path on sftp to search
        filename_mask              = Format of filename
        src_file_delim (opt)       = Delimiter to use, default is comma
        src_file_encoding (opt)    = Source file encoding, default is utf-8
        src_file_head_row (opt)    = Source file header, default is 0 or first row contains headers

    Return:
        files_touched               = number of files touched during the process
        files_downloaded            = number of files downloaded during the process
        df_result                   = Pandas dataframe that combines all files
        lst_files                   = list of files that met the criteria, for downstream processing

    Example usage:
        files_touched, files_downloaded, df_result = sftp_get_files_all(
            creds = sftp_creds
            ,src_path = sftp_src_path
            ,filename_mask = 'file_name_sample'
            ,src_file_delim = '|'
            ,src_file_encoding = 'utf-8'
            ,src_file_head_row = 1
        )
    """

    # Check for default values
    if src_file_delim is None:
        src_file_delim = ','                # csv default
   
    if src_file_encoding is None:
        src_file_encoding = 'utf-8'         # utf-8 default

    if src_file_head_row is not None:
        src_file_head_row -= 1              # subtracting 1, argument for underlying function uses 0 to indicate first row, None = no headers

    # Connect to the SFTP server using paramiko
    transport = paramiko.Transport((creds['endpoint'],creds['port']))
    transport.connect(username=creds['username'], password=creds['password'])

    files_touched = 0
    files_downloaded = 0
    lst_df = []
    lst_files = []

OldDogNewTricks
Advocate II
Advocate II

This function downloads a single file and puts it into a Pandas dataframe:
def sftp_download_file(
    sftp
    ,file_path: str
    ,src_file_delim: str
    ,src_file_encoding: str
    ,src_file_head_row: int
    ,trans_object
    ) -> pd.DataFrame:
    """
    Download a file from the SFTP server to Microsoft Fabric Lakehouse.

    @sftp                   = Paramiko SFTP object
    @src_file_delim         = Delimiter for source file
    @src_file_encoding      = Encoding for source file
    @src_file_head_row      = Header row for the source file
    @trans_object           = Paramiko transport object, for closing connection upon error
   
    Returns:        
    """

    with sftp.open(file_path, 'rb') as f:
        data = f.read()
        try:
            df_file_data = pd.read_csv(BytesIO(data), encoding = src_file_encoding, sep = src_file_delim, header = src_file_head_row)  # Read into a temporary DataFrame
            return df_file_data
        except Exception as e:
            print(f'\n---Error in file download with file: {file_path}.  Exception: {e}---\n')
            trans_object.close()            # if there is an error, close the connection
            return pd.DataFrame()

I would like to write the file to my lake house exactly as it was read with no interpetation as to delmited etc.

OldDogNewTricks
Advocate II
Advocate II

I store the endpoint (including port) in KeyVault along with creds.  This function returns the relevant details in a dictionary based on how I store them in KeyVault:


import notebookutils as nbu

from typing import Optional, Tuple, Any, List, Union, Dict
import paramiko
import pandas as pd
from io import BytesIO


def get_ftp_creds (prefix: str)-> dict:
    """
    Function that takes a prefix and returns the 4 necessary datapoints from key vault (endpoint - which stores host and port, username, password).
    Returns a dictionary with the endpoint, username, password, and port values

    Paramaters:
        prefix          String that identifies the resources to get values for

    Returns:
        dict            Dictionary object with relevant sftp connection values
   
    Example usage:
        ftp_data = get_ftp_creds('ftp-com-ech')
       
        endpoint = ftp_data['endpoint']
        username = ftp_data['username']
        password = ftp_data['password']
        port = ftp_data['port']          
    """
   
    host, port = nbu.credentials.getSecret('https://YOUR_KV_ENDPOINT_HERE/', f'{prefix}-endpoint').split(":")

    ftp_data = {}
    ftp_data['endpoint'] = host
    ftp_data['username'] = nbu.credentials.getSecret('YOUR_KV_ENDPOINT_HERE/', f'{prefix}-user')
    ftp_data['password'] = nbu.credentials.getSecret('YOUR_KV_ENDPOINT_HERE/', f'{prefix}-pass')
    ftp_data['port'] = int(port)
   
    return(ftp_data)

OldDogNewTricks
Advocate II
Advocate II

I will admit that I am not a pipeline expert, but from what I can tell; there is no ability to either 1) archive as an option within the copy activity or 2) get the list of files out of the copy activity.

Are you using an on-premise gateway to access said FTP/SFTP?
 
If you are NOT using a on-premise gateway, I have a solution for you using pure notebooks.  If you are using an on-premise gateway, I unfortunately do not have a solution for you because pipelines are the only thing in MS Fabric that can use those gateway connections.

I am not using a gateway. I can do it completly with notebooks to do the ftp connection etc. This is really the reason I have never used data factory and pipelines in the past. Just not enough flexability. Now saddled with the same thing in Fabric. Oh well.

Helpful resources

Announcements
May FBC25 Carousel

Fabric Monthly Update - May 2025

Check out the May 2025 Fabric update to learn about new features.

May 2025 Monthly Update

Fabric Community Update - May 2025

Find out what's new and trending in the Fabric community.