import gradio as gr import pandas as pd import plotly.express as px import time import duckdb from huggingface_hub import list_repo_files # Using the stable, community-built RangeSlider component from gradio_rangeslider import RangeSlider import datetime # Import the datetime module # --- Constants --- PARAM_CHOICES = ['< 1B', '1B', '5B', '12B', '32B', '64B', '128B', '256B', '> 500B'] PARAM_CHOICES_DEFAULT_INDICES = (0, len(PARAM_CHOICES) - 1) TOP_K_CHOICES = list(range(5, 51, 5)) HF_DATASET_ID = "evijit/modelverse_daily_data" TAG_FILTER_CHOICES = [ "Audio & Speech", "Time series", "Robotics", "Music", "Video", "Images", "Text", "Biomedical", "Sciences" ] PIPELINE_TAGS = [ 'text-generation', 'text-to-image', 'text-classification', 'text2text-generation', 'audio-to-audio', 'feature-extraction', 'image-classification', 'translation', 'reinforcement-learning', 'fill-mask', 'text-to-speech', 'automatic-speech-recognition', 'image-text-to-text', 'token-classification', 'sentence-similarity', 'question-answering', 'image-feature-extraction', 'summarization', 'zero-shot-image-classification', 'object-detection', 'image-segmentation', 'image-to-image', 'image-to-text', 'audio-classification', 'visual-question-answering', 'text-to-video', 'zero-shot-classification', 'depth-estimation', 'text-ranking', 'image-to-video', 'multiple-choice', 'unconditional-image-generation', 'video-classification', 'text-to-audio', 'time-series-forecasting', 'any-to-any', 'video-text-to-text', 'table-question-answering' ] def load_models_data(): overall_start_time = time.time() print(f"Attempting to load dataset metadata from Hugging Face Hub: {HF_DATASET_ID}") try: files = list_repo_files(HF_DATASET_ID, repo_type="dataset") parquet_files = [f for f in files if f.endswith('.parquet')] if not parquet_files: return [], False, "No parquet files found in dataset." urls = [f"https://huggingface.co/datasets/{HF_DATASET_ID}/resolve/main/{f}" for f in parquet_files] msg = f"Successfully identified {len(urls)} parquet files in {time.time() - overall_start_time:.2f}s." print(msg) return urls, True, msg except Exception as e: err_msg = f"Failed to load dataset metadata. Error: {e}" print(err_msg) return [], False, err_msg def get_param_range_values(param_range_labels): min_label, max_label = param_range_labels min_val = 0.0 if '<' in min_label else float(min_label.replace('B', '')) max_val = float('inf') if '>' in max_label else float(max_label.replace('B', '')) return min_val, max_val def make_treemap_data(parquet_urls, count_by, top_k=25, tag_filter=None, pipeline_filter=None, param_range=None, skip_orgs=None, include_unknown_param_size=True, created_after_date: float = None): if not parquet_urls: return pd.DataFrame() con = duckdb.connect() con.execute("INSTALL httpfs; LOAD httpfs;") urls_str = ", ".join([f"'{u}'" for u in parquet_urls]) con.execute(f"CREATE VIEW models AS SELECT * FROM read_parquet([{urls_str}])") where_clauses = [] if not include_unknown_param_size: where_clauses.append("params IS NOT NULL AND params != -1") col_map = { "Audio & Speech": "is_audio_speech", "Music": "has_music", "Robotics": "has_robot", "Biomedical": "is_biomed", "Time series": "has_series", "Sciences": "has_science", "Video": "has_video", "Images": "has_image", "Text": "has_text" } if tag_filter and tag_filter in col_map: where_clauses.append(f"{col_map[tag_filter]} = true") if pipeline_filter: where_clauses.append(f"pipeline_tag = '{pipeline_filter}'") if param_range: min_params, max_params = get_param_range_values(param_range) is_default_range = (param_range[0] == PARAM_CHOICES[0] and param_range[1] == PARAM_CHOICES[-1]) if not is_default_range: conditions = [] if min_params is not None: conditions.append(f"params >= {min_params}") if max_params is not None and max_params != float('inf'): conditions.append(f"params < {max_params}") if conditions: where_clauses.append("(" + " AND ".join(conditions) + ")") if created_after_date is not None: where_clauses.append(f"CAST(createdAt AS TIMESTAMPTZ) > to_timestamp({created_after_date})") if skip_orgs and len(skip_orgs) > 0: orgs_str = ", ".join([f"'{o}'" for o in skip_orgs]) where_clauses.append(f"organization NOT IN ({orgs_str})") where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" metric = f"COALESCE({count_by}, 0)" query = f""" SELECT organization, SUM({metric}) as total_metric FROM models {where_sql} GROUP BY organization ORDER BY total_metric DESC LIMIT {top_k} """ top_orgs_df = con.execute(query).df() if top_orgs_df.empty: return pd.DataFrame() top_orgs_list = top_orgs_df['organization'].tolist() orgs_filter = ", ".join([f"'{o}'" for o in top_orgs_list]) detail_query = f""" SELECT id, organization, {metric} as {count_by} FROM models {where_sql} AND organization IN ({orgs_filter}) """ treemap_data = con.execute(detail_query).df() treemap_data["root"] = "models" return treemap_data def create_treemap(treemap_data, count_by, title=None): if treemap_data.empty: fig = px.treemap(names=["No data matches filters"], parents=[""], values=[1]) fig.update_layout(title="No data matches the selected filters", margin=dict(t=50, l=25, r=25, b=25)) return fig fig = px.treemap(treemap_data, path=["root", "organization", "id"], values=count_by, title=title, color_discrete_sequence=px.colors.qualitative.Plotly) fig.update_layout(margin=dict(t=50, l=25, r=25, b=25)) fig.update_traces(textinfo="label+value+percent root", hovertemplate="%{label}
%{value:,} " + count_by + "
%{percentRoot:.2%} of total") return fig custom_css = """ .model-parameters-group > .block { background: none !important; border: none !important; box-shadow: none !important; } #param-slider-wrapper .head, #param-slider-wrapper div[data-testid="range-slider"] > span { display: none !important; } """ with gr.Blocks(title="🤗 ModelVerse Explorer", fill_width=True, css=custom_css) as demo: models_data_state = gr.State([]) loading_complete_state = gr.State(False) with gr.Row(): gr.Markdown("# 🤗 ModelVerse Explorer") with gr.Row(): with gr.Column(scale=1): count_by_dropdown = gr.Dropdown(label="Metric", choices=[("Downloads (last 30 days)", "downloads"), ("Downloads (All Time)", "downloadsAllTime"), ("Likes", "likes")], value="downloads") filter_choice_radio = gr.Radio(label="Filter Type", choices=["None", "Tag Filter", "Pipeline Filter"], value="None") tag_filter_dropdown = gr.Dropdown(label="Select Tag", choices=TAG_FILTER_CHOICES, value=None, visible=False) pipeline_filter_dropdown = gr.Dropdown(label="Select Pipeline Tag", choices=PIPELINE_TAGS, value=None, visible=False) with gr.Group(elem_classes="model-parameters-group"): gr.Markdown("
Model Parameters
") param_range_slider = RangeSlider( minimum=0, maximum=len(PARAM_CHOICES) - 1, value=PARAM_CHOICES_DEFAULT_INDICES, step=1, label=None, show_label=False, elem_id="param-slider-wrapper" ) param_range_display = gr.Markdown(f"Range: `{PARAM_CHOICES[0]}` to `{PARAM_CHOICES[-1]}`") include_unknown_params_checkbox = gr.Checkbox(label="Include models with unknown parameter size", value=True) created_after_datepicker = gr.DateTime(label="Created After") top_k_dropdown = gr.Dropdown(label="Number of Top Organizations", choices=TOP_K_CHOICES, value=25) skip_orgs_textbox = gr.Textbox(label="Organizations to Skip (comma-separated)", value="TheBloke,MaziyarPanahi,unsloth,modularai,Gensyn,bartowski") generate_plot_button = gr.Button(value="Generate Plot", variant="primary", interactive=False) with gr.Column(scale=3): plot_output = gr.Plot() status_message_md = gr.Markdown("Initializing...") data_info_md = gr.Markdown("") def update_param_display(value: tuple): min_idx, max_idx = int(value[0]), int(value[1]) return f"Range: `{PARAM_CHOICES[min_idx]}` to `{PARAM_CHOICES[max_idx]}`" def _toggle_unknown_params_checkbox(param_range_indices): min_idx, max_idx = int(param_range_indices[0]), int(param_range_indices[1]) is_default_range = (min_idx == PARAM_CHOICES_DEFAULT_INDICES[0] and max_idx == PARAM_CHOICES_DEFAULT_INDICES[1]) if not is_default_range: return gr.update(interactive=False, value=False) else: return gr.update(interactive=True) param_range_slider.change(update_param_display, param_range_slider, param_range_display) param_range_slider.change(_toggle_unknown_params_checkbox, param_range_slider, include_unknown_params_checkbox) loading_complete_state.change(lambda is_loaded: gr.update(interactive=is_loaded), loading_complete_state, generate_plot_button) filter_choice_radio.change(lambda choice: (gr.update(visible=choice == "Tag Filter"), gr.update(visible=choice == "Pipeline Filter")), filter_choice_radio, [tag_filter_dropdown, pipeline_filter_dropdown]) def load_and_generate_initial_plot(progress=gr.Progress()): progress(0, desc=f"Loading dataset metadata '{HF_DATASET_ID}'...") parquet_urls, load_success_flag, status_msg_from_load = [], False, "" try: parquet_urls, load_success_flag, status_msg_from_load = load_models_data() if load_success_flag: progress(0.5, desc="Processing metadata...") # Quick query to get stats con = duckdb.connect() con.execute("INSTALL httpfs; LOAD httpfs;") urls_str = ", ".join([f"'{u}'" for u in parquet_urls]) con.execute(f"CREATE VIEW models AS SELECT * FROM read_parquet([{urls_str}])") # Get total count and timestamp stats = con.execute("SELECT count(*), max(data_download_timestamp), count(params) FROM models").fetchone() total_count = stats[0] ts = stats[1] # Timestamp object param_count = stats[2] date_display = ts.strftime('%B %d, %Y, %H:%M:%S %Z') if ts else "Pre-processed (date unavailable)" data_info_text = (f"### Data Information\n- Source: `{HF_DATASET_ID}`\n- Status: {status_msg_from_load}\n" f"- Total models loaded: {total_count:,}\n- Models with known parameter counts: {param_count:,}\n" f"- Models with unknown parameter counts: {total_count - param_count:,}\n- Data as of: {date_display}\n") else: data_info_text = f"### Data Load Failed\n- {status_msg_from_load}" except Exception as e: status_msg_from_load = f"An unexpected error occurred: {str(e)}" data_info_text = f"### Critical Error\n- {status_msg_from_load}" print(f"Critical error in load_and_generate_initial_plot: {e}") progress(0.6, desc="Generating initial plot...") initial_plot, initial_status = ui_generate_plot_controller( "downloads", "None", None, None, PARAM_CHOICES_DEFAULT_INDICES, 25, "TheBloke,MaziyarPanahi,unsloth,modularai,Gensyn,bartowski", True, None, parquet_urls, progress ) return parquet_urls, load_success_flag, data_info_text, initial_status, initial_plot def ui_generate_plot_controller(metric_choice, filter_type, tag_choice, pipeline_choice, param_range_indices, k_orgs, skip_orgs_input, include_unknown_param_size_flag, created_after_date, parquet_urls, progress=gr.Progress()): if not parquet_urls: return create_treemap(pd.DataFrame(), metric_choice, "Error: Model Data Not Loaded"), "Model data is not loaded." progress(0.1, desc="Preparing data...") param_labels = [PARAM_CHOICES[int(param_range_indices[0])], PARAM_CHOICES[int(param_range_indices[1])]] treemap_df = make_treemap_data( parquet_urls, metric_choice, k_orgs, tag_choice if filter_type == "Tag Filter" else None, pipeline_choice if filter_type == "Pipeline Filter" else None, param_labels, [org.strip() for org in skip_orgs_input.split(',') if org.strip()], include_unknown_param_size_flag, created_after_date ) progress(0.7, desc="Generating plot...") title_labels = {"downloads": "Downloads (last 30 days)", "downloadsAllTime": "Downloads (All Time)", "likes": "Likes"} plotly_fig = create_treemap(treemap_df, metric_choice, f"HuggingFace Models - {title_labels.get(metric_choice, metric_choice)} by Organization") plot_stats_md = (f"## Plot Statistics\n- **Models shown**: {len(treemap_df['id'].unique()):,}\n" f"- **Total {metric_choice}**: {int(treemap_df[metric_choice].sum()):,}") if not treemap_df.empty else "No data matches the selected filters." return plotly_fig, plot_stats_md demo.load(load_and_generate_initial_plot, None, [models_data_state, loading_complete_state, data_info_md, status_message_md, plot_output]) generate_plot_button.click( ui_generate_plot_controller, [count_by_dropdown, filter_choice_radio, tag_filter_dropdown, pipeline_filter_dropdown, param_range_slider, top_k_dropdown, skip_orgs_textbox, include_unknown_params_checkbox, created_after_datepicker, models_data_state], [plot_output, status_message_md] ) if __name__ == "__main__": print(f"Application starting...") demo.queue().launch()